Refactor project structure

This commit is contained in:
Alexey Zakharenkov 2024-03-05 16:43:20 +03:00
parent c2f2956da1
commit 60821b60d6
77 changed files with 3535 additions and 3181 deletions

View file

@ -27,7 +27,7 @@ jobs:
run: |
python -m pip install --upgrade pip
pip install flake8==6.0.0 black==23.1.0 shapely==2.0.1
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
pip install -r subways/requirements.txt
- name: Lint with flake8
run: |
flake8
@ -36,4 +36,4 @@ jobs:
black --check --line-length 79 .
- name: Test with unittest
run: |
python -m unittest discover tests
python -m unittest discover tests

View file

@ -1,7 +1,7 @@
# Subway Preprocessor
Here you see a list of scripts that can be used for preprocessing all the metro
systems in the world from OpenStreetMap. `subway_structure.py` produces
systems in the world from OpenStreetMap. `scripts/subway_structure.py` produces
a list of disjunct systems that can be used for routing and for displaying
of metro maps.
@ -16,14 +16,14 @@ of metro maps.
2. If you don't specify `--xml` or `--source` option to the `process_subways.py` script
it tries to fetch data over [Overpass API](https://wiki.openstreetmap.org/wiki/Overpass_API).
**Not suitable for the whole planet or large countries.**
* Run `process_subways.py` with appropriate set of command line arguments
* Run `scripts/process_subways.py` with appropriate set of command line arguments
to build metro structures and receive a validation log.
* Run `validation_to_html.py` on that log to create readable HTML tables.
* Run `tools/v2h/validation_to_html.py` on that log to create readable HTML tables.
## Validating of all metro networks
There is a `process_subways.sh` in the `scripts` directory that is suitable
There is a `scripts/process_subways.sh` script that is suitable
for validation of all or many metro networks. It relies on a bunch of
environment variables and takes advantage of previous validation runs
for effective recurring validations. See
@ -51,17 +51,21 @@ a city's bbox has been extended.
## Validating of a single city
A single city or a country with few metro networks can be validated much faster
if you allow the `process_subway.py` to fetch data from Overpass API. Here are the steps:
if you allow the `scripts/process_subway.py` to fetch data from Overpass API. Here are the steps:
1. Python3 interpreter required (3.11+)
2. Clone the repo
```
```bash
git clone https://github.com/alexey-zakharenkov/subways.git subways_validator
cd subways_validator
```
3. Execute
3. Install python dependencies
```bash
pip install -r subways/requirements.txt
```
4. Execute
```bash
python3 ./process_subways.py -c "London" \
python3 scripts/process_subways.py -c "London" \
-l validation.log -d London.yaml
```
here
@ -73,21 +77,21 @@ if you allow the `process_subway.py` to fetch data from Overpass API. Here are t
`validation.log` would contain the list of errors and warnings.
To convert it into pretty HTML format
4. do
5. do
```bash
mkdir html
python3 ./validation_to_html.py validation.log html
python3 tools/v2h/validation_to_html.py validation.log html
```
## Publishing validation reports to the Web
Expose a directory with static contents via a web-server and put into it:
- HTML files from the directory specified in the 2nd parameter of `validation_to_html.py`
- HTML files from the directory specified in the 2nd parameter of `scripts/v2h/validation_to_html.py`
- To vitalize "Y" (YAML), "J" (GeoJSON) and "M" (Map) links beside each city name:
- The contents of `render` directory from the repository
- `cities.txt` file generated with `--dump-city-list` parameter of `process_subways.py`
- YAML files created due to -d option of `process_subways.py`
- GeoJSON files created due to -j option of `process_subways.py`
- `cities.txt` file generated with `--dump-city-list` parameter of `scripts/process_subways.py`
- YAML files created due to -d option of `scripts/process_subways.py`
- GeoJSON files created due to -j option of `scripts/process_subways.py`
## Related external resources
@ -103,9 +107,9 @@ You can find more info about this validator instance in
## Adding Stop Areas To OSM
To quickly add `stop_area` relations for the entire city, use the `make_stop_areas.py` script
from the `stop_area` directory. Give it a bounding box or a `.json` file download from Overpass API.
It would produce an JOSM XML file that you should manually check in JOSM. After that
To quickly add `stop_area` relations for the entire city, use the `tools/stop_areas/make_stop_areas.py` script.
Give it a bounding box or a `.json` file download from Overpass API.
It would produce a JOSM XML file that you should manually check in JOSM. After that
just upload it.
## Author and License

View file

@ -1,577 +0,0 @@
#!/usr/bin/env python3
import argparse
import csv
import inspect
import json
import logging
import os
import re
import sys
import time
import urllib.parse
import urllib.request
from functools import partial
import processors
from subway_io import (
dump_yaml,
load_xml,
make_geojson,
read_recovery_data,
write_recovery_data,
)
from subway_structure import (
City,
CriticalValidationError,
find_transfers,
get_unused_subway_entrances_geojson,
LonLat,
MODES_OVERGROUND,
MODES_RAPID,
OsmElementT,
)
DEFAULT_SPREADSHEET_ID = "1SEW1-NiNOnA2qDwievcxYV1FOaQl1mb1fdeyqAxHu3k"
DEFAULT_CITIES_INFO_URL = (
"https://docs.google.com/spreadsheets/d/"
f"{DEFAULT_SPREADSHEET_ID}/export?format=csv"
)
BAD_MARK = "[bad]"
def compose_overpass_request(
overground: bool, bboxes: list[list[float]]
) -> str:
if not bboxes:
raise RuntimeError("No bboxes given for overpass request")
query = "[out:json][timeout:1000];("
modes = MODES_OVERGROUND if overground else MODES_RAPID
for bbox in bboxes:
bbox_part = f"({','.join(str(coord) for coord in bbox)})"
query += "("
for mode in sorted(modes):
query += f'rel[route="{mode}"]{bbox_part};'
query += ");"
query += "rel(br)[type=route_master];"
if not overground:
query += f"node[railway=subway_entrance]{bbox_part};"
query += f"node[railway=train_station_entrance]{bbox_part};"
query += f"rel[public_transport=stop_area]{bbox_part};"
query += (
"rel(br)[type=public_transport][public_transport=stop_area_group];"
)
query += ");(._;>>;);out body center qt;"
logging.debug("Query: %s", query)
return query
def overpass_request(
overground: bool, overpass_api: str, bboxes: list[list[float]]
) -> list[OsmElementT]:
query = compose_overpass_request(overground, bboxes)
url = f"{overpass_api}?data={urllib.parse.quote(query)}"
response = urllib.request.urlopen(url, timeout=1000)
if (r_code := response.getcode()) != 200:
raise Exception(f"Failed to query Overpass API: HTTP {r_code}")
return json.load(response)["elements"]
def multi_overpass(
overground: bool, overpass_api: str, bboxes: list[list[float]]
) -> list[OsmElementT]:
SLICE_SIZE = 10
INTERREQUEST_WAIT = 5 # in seconds
result = []
for i in range(0, len(bboxes), SLICE_SIZE):
if i > 0:
time.sleep(INTERREQUEST_WAIT)
bboxes_i = bboxes[i : i + SLICE_SIZE] # noqa E203
result.extend(overpass_request(overground, overpass_api, bboxes_i))
return result
def slugify(name: str) -> str:
return re.sub(r"[^a-z0-9_-]+", "", name.lower().replace(" ", "_"))
def get_way_center(
element: OsmElementT, node_centers: dict[int, LonLat]
) -> LonLat | None:
"""
:param element: dict describing OSM element
:param node_centers: osm_id => (lat, lon)
:return: tuple with center coordinates, or None
"""
# If elements have been queried via overpass-api with
# 'out center;' clause then ways already have 'center' attribute
if "center" in element:
return element["center"]["lon"], element["center"]["lat"]
if "nodes" not in element:
return None
center = [0, 0]
count = 0
way_nodes = element["nodes"]
way_nodes_len = len(element["nodes"])
for i, nd in enumerate(way_nodes):
if nd not in node_centers:
continue
# Don't count the first node of a closed way twice
if (
i == way_nodes_len - 1
and way_nodes_len > 1
and way_nodes[0] == way_nodes[-1]
):
break
center[0] += node_centers[nd][0]
center[1] += node_centers[nd][1]
count += 1
if count == 0:
return None
element["center"] = {"lat": center[1] / count, "lon": center[0] / count}
return element["center"]["lon"], element["center"]["lat"]
def get_relation_center(
element: OsmElementT,
node_centers: dict[int, LonLat],
way_centers: dict[int, LonLat],
relation_centers: dict[int, LonLat],
ignore_unlocalized_child_relations: bool = False,
) -> LonLat | None:
"""
:param element: dict describing OSM element
:param node_centers: osm_id => LonLat
:param way_centers: osm_id => LonLat
:param relation_centers: osm_id => LonLat
:param ignore_unlocalized_child_relations: if a member that is a relation
has no center, skip it and calculate center based on member nodes,
ways and other, "localized" (with known centers), relations
:return: tuple with center coordinates, or None
"""
# If elements have been queried via overpass-api with
# 'out center;' clause then some relations already have 'center'
# attribute. But this is not the case for relations composed only
# of other relations (e.g., route_master, stop_area_group or
# stop_area with only members that are multipolygons)
if "center" in element:
return element["center"]["lon"], element["center"]["lat"]
center = [0, 0]
count = 0
for m in element.get("members", list()):
m_id = m["ref"]
m_type = m["type"]
if m_type == "relation" and m_id not in relation_centers:
if ignore_unlocalized_child_relations:
continue
else:
# Cannot calculate fair center because the center
# of a child relation is not known yet
return None
member_container = (
node_centers
if m_type == "node"
else way_centers
if m_type == "way"
else relation_centers
)
if m_id in member_container:
center[0] += member_container[m_id][0]
center[1] += member_container[m_id][1]
count += 1
if count == 0:
return None
element["center"] = {"lat": center[1] / count, "lon": center[0] / count}
return element["center"]["lon"], element["center"]["lat"]
def calculate_centers(elements: list[OsmElementT]) -> None:
"""Adds 'center' key to each way/relation in elements,
except for empty ways or relations.
Relies on nodes-ways-relations order in the elements list.
"""
nodes: dict[int, LonLat] = {} # id => LonLat
ways: dict[int, LonLat] = {} # id => approx center LonLat
relations: dict[int, LonLat] = {} # id => approx center LonLat
unlocalized_relations: list[OsmElementT] = [] # 'unlocalized' means
# the center of the relation has not been calculated yet
for el in elements:
if el["type"] == "node":
nodes[el["id"]] = (el["lon"], el["lat"])
elif el["type"] == "way":
if center := get_way_center(el, nodes):
ways[el["id"]] = center
elif el["type"] == "relation":
if center := get_relation_center(el, nodes, ways, relations):
relations[el["id"]] = center
else:
unlocalized_relations.append(el)
def iterate_relation_centers_calculation(
ignore_unlocalized_child_relations: bool,
) -> list[OsmElementT]:
unlocalized_relations_upd = []
for rel in unlocalized_relations:
if center := get_relation_center(
rel, nodes, ways, relations, ignore_unlocalized_child_relations
):
relations[rel["id"]] = center
else:
unlocalized_relations_upd.append(rel)
return unlocalized_relations_upd
# Calculate centers for relations that have no one yet
while unlocalized_relations:
unlocalized_relations_upd = iterate_relation_centers_calculation(False)
progress = len(unlocalized_relations_upd) < len(unlocalized_relations)
if not progress:
unlocalized_relations_upd = iterate_relation_centers_calculation(
True
)
progress = len(unlocalized_relations_upd) < len(
unlocalized_relations
)
if not progress:
break
unlocalized_relations = unlocalized_relations_upd
def add_osm_elements_to_cities(
osm_elements: list[OsmElementT], cities: list[City]
) -> None:
for el in osm_elements:
for c in cities:
if c.contains(el):
c.add(el)
def validate_cities(cities: list[City]) -> list[City]:
"""Validate cities. Return list of good cities."""
good_cities = []
for c in cities:
try:
c.extract_routes()
except CriticalValidationError as e:
logging.error(
"Critical validation error while processing %s: %s",
c.name,
e,
)
c.error(str(e))
except AssertionError as e:
logging.error(
"Validation logic error while processing %s: %s",
c.name,
e,
)
c.error(f"Validation logic error: {e}")
else:
c.validate()
if c.is_good:
c.calculate_distances()
good_cities.append(c)
return good_cities
def get_cities_info(
cities_info_url: str = DEFAULT_CITIES_INFO_URL,
) -> list[dict]:
response = urllib.request.urlopen(cities_info_url)
if (
not cities_info_url.startswith("file://")
and (r_code := response.getcode()) != 200
):
raise Exception(
f"Failed to download cities spreadsheet: HTTP {r_code}"
)
data = response.read().decode("utf-8")
reader = csv.DictReader(
data.splitlines(),
fieldnames=(
"id",
"name",
"country",
"continent",
"num_stations",
"num_lines",
"num_light_lines",
"num_interchanges",
"bbox",
"networks",
),
)
cities_info = list()
names = set()
next(reader) # skipping the header
for city_info in reader:
if city_info["id"] and city_info["bbox"]:
cities_info.append(city_info)
name = city_info["name"].strip()
if name in names:
logging.warning(
"Duplicate city name in city list: %s",
city_info,
)
names.add(name)
return cities_info
def prepare_cities(
cities_info_url: str = DEFAULT_CITIES_INFO_URL, overground: bool = False
) -> list[City]:
if overground:
raise NotImplementedError("Overground transit not implemented yet")
cities_info = get_cities_info(cities_info_url)
return list(map(partial(City, overground=overground), cities_info))
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument(
"--cities-info-url",
default=DEFAULT_CITIES_INFO_URL,
help=(
"URL of CSV file with reference information about rapid transit "
"networks. file:// protocol is also supported."
),
)
parser.add_argument(
"-i",
"--source",
help="File to write backup of OSM data, or to read data from",
)
parser.add_argument(
"-x", "--xml", help="OSM extract with routes, to read data from"
)
parser.add_argument(
"--overpass-api",
default="http://overpass-api.de/api/interpreter",
help="Overpass API URL",
)
parser.add_argument(
"-q",
"--quiet",
action="store_true",
help="Show only warnings and errors",
)
parser.add_argument(
"-c", "--city", help="Validate only a single city or a country"
)
parser.add_argument(
"-t",
"--overground",
action="store_true",
help="Process overground transport instead of subways",
)
parser.add_argument(
"-e",
"--entrances",
type=argparse.FileType("w", encoding="utf-8"),
help="Export unused subway entrances as GeoJSON here",
)
parser.add_argument(
"-l",
"--log",
type=argparse.FileType("w", encoding="utf-8"),
help="Validation JSON file name",
)
parser.add_argument(
"--dump-city-list",
type=argparse.FileType("w", encoding="utf-8"),
help=(
"Dump sorted list of all city names, possibly with "
f"{BAD_MARK} mark"
),
)
for processor_name, processor in inspect.getmembers(
processors, inspect.ismodule
):
if not processor_name.startswith("_"):
parser.add_argument(
f"--output-{processor_name}",
help=(
"Processed metro systems output filename "
f"in {processor_name.upper()} format"
),
)
parser.add_argument("--cache", help="Cache file name for processed data")
parser.add_argument(
"-r", "--recovery-path", help="Cache file name for error recovery"
)
parser.add_argument(
"-d", "--dump", help="Make a YAML file for a city data"
)
parser.add_argument(
"-j", "--geojson", help="Make a GeoJSON file for a city data"
)
parser.add_argument(
"--crude",
action="store_true",
help="Do not use OSM railway geometry for GeoJSON",
)
options = parser.parse_args()
if options.quiet:
log_level = logging.WARNING
else:
log_level = logging.INFO
logging.basicConfig(
level=log_level,
datefmt="%H:%M:%S",
format="%(asctime)s %(levelname)-7s %(message)s",
)
cities = prepare_cities(options.cities_info_url, options.overground)
if options.city:
cities = [
c
for c in cities
if c.name == options.city or c.country == options.city
]
if not cities:
logging.error("No cities to process")
sys.exit(2)
# Augment cities with recovery data
recovery_data = None
if options.recovery_path:
recovery_data = read_recovery_data(options.recovery_path)
for city in cities:
city.recovery_data = recovery_data.get(city.name, None)
logging.info("Read %s metro networks", len(cities))
# Reading cached json, loading XML or querying Overpass API
if options.source and os.path.exists(options.source):
logging.info("Reading %s", options.source)
with open(options.source, "r") as f:
osm = json.load(f)
if "elements" in osm:
osm = osm["elements"]
calculate_centers(osm)
elif options.xml:
logging.info("Reading %s", options.xml)
osm = load_xml(options.xml)
calculate_centers(osm)
if options.source:
with open(options.source, "w", encoding="utf-8") as f:
json.dump(osm, f)
else:
if len(cities) > 10:
logging.error(
"Would not download that many cities from Overpass API, "
"choose a smaller set"
)
sys.exit(3)
bboxes = [c.bbox for c in cities]
logging.info("Downloading data from Overpass API")
osm = multi_overpass(options.overground, options.overpass_api, bboxes)
calculate_centers(osm)
if options.source:
with open(options.source, "w", encoding="utf-8") as f:
json.dump(osm, f)
logging.info("Downloaded %s elements", len(osm))
logging.info("Sorting elements by city")
add_osm_elements_to_cities(osm, cities)
logging.info("Building routes for each city")
good_cities = validate_cities(cities)
logging.info("Finding transfer stations")
transfers = find_transfers(osm, good_cities)
good_city_names = set(c.name for c in good_cities)
logging.info(
"%s good cities: %s",
len(good_city_names),
", ".join(sorted(good_city_names)),
)
bad_city_names = set(c.name for c in cities) - good_city_names
logging.info(
"%s bad cities: %s",
len(bad_city_names),
", ".join(sorted(bad_city_names)),
)
if options.dump_city_list:
lines = sorted(
f"{city.name}, {city.country}"
f"{' ' + BAD_MARK if city.name in bad_city_names else ''}\n"
for city in cities
)
options.dump_city_list.writelines(lines)
if options.recovery_path:
write_recovery_data(options.recovery_path, recovery_data, cities)
if options.entrances:
json.dump(get_unused_subway_entrances_geojson(osm), options.entrances)
if options.dump:
if os.path.isdir(options.dump):
for c in cities:
with open(
os.path.join(options.dump, slugify(c.name) + ".yaml"),
"w",
encoding="utf-8",
) as f:
dump_yaml(c, f)
elif len(cities) == 1:
with open(options.dump, "w", encoding="utf-8") as f:
dump_yaml(cities[0], f)
else:
logging.error("Cannot dump %s cities at once", len(cities))
if options.geojson:
if os.path.isdir(options.geojson):
for c in cities:
with open(
os.path.join(
options.geojson, slugify(c.name) + ".geojson"
),
"w",
encoding="utf-8",
) as f:
json.dump(make_geojson(c, not options.crude), f)
elif len(cities) == 1:
with open(options.geojson, "w", encoding="utf-8") as f:
json.dump(make_geojson(cities[0], not options.crude), f)
else:
logging.error(
"Cannot make a geojson of %s cities at once", len(cities)
)
if options.log:
res = []
for c in cities:
v = c.get_validation_result()
v["slug"] = slugify(c.name)
res.append(v)
json.dump(res, options.log, indent=2, ensure_ascii=False)
for processor_name, processor in inspect.getmembers(
processors, inspect.ismodule
):
option_name = f"output_{processor_name}"
if not getattr(options, option_name, None):
continue
filename = getattr(options, option_name)
processor.process(cities, transfers, filename, options.cache)
if __name__ == "__main__":
main()

276
scripts/process_subways.py Executable file
View file

@ -0,0 +1,276 @@
import argparse
import inspect
import json
import logging
import os
import re
import sys
from subways import processors
from subways.overpass import multi_overpass
from subways.subway_io import (
dump_yaml,
load_xml,
make_geojson,
read_recovery_data,
write_recovery_data,
)
from subways.structure.city import (
find_transfers,
get_unused_subway_entrances_geojson,
)
from subways.validation import (
add_osm_elements_to_cities,
BAD_MARK,
calculate_centers,
DEFAULT_CITIES_INFO_URL,
prepare_cities,
validate_cities,
)
def slugify(name: str) -> str:
return re.sub(r"[^a-z0-9_-]+", "", name.lower().replace(" ", "_"))
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument(
"--cities-info-url",
default=DEFAULT_CITIES_INFO_URL,
help=(
"URL of CSV file with reference information about rapid transit "
"networks. file:// protocol is also supported."
),
)
parser.add_argument(
"-i",
"--source",
help="File to write backup of OSM data, or to read data from",
)
parser.add_argument(
"-x", "--xml", help="OSM extract with routes, to read data from"
)
parser.add_argument(
"--overpass-api",
default="http://overpass-api.de/api/interpreter",
help="Overpass API URL",
)
parser.add_argument(
"-q",
"--quiet",
action="store_true",
help="Show only warnings and errors",
)
parser.add_argument(
"-c", "--city", help="Validate only a single city or a country"
)
parser.add_argument(
"-t",
"--overground",
action="store_true",
help="Process overground transport instead of subways",
)
parser.add_argument(
"-e",
"--entrances",
type=argparse.FileType("w", encoding="utf-8"),
help="Export unused subway entrances as GeoJSON here",
)
parser.add_argument(
"-l",
"--log",
type=argparse.FileType("w", encoding="utf-8"),
help="Validation JSON file name",
)
parser.add_argument(
"--dump-city-list",
type=argparse.FileType("w", encoding="utf-8"),
help=(
"Dump sorted list of all city names, possibly with "
f"{BAD_MARK} mark"
),
)
for processor_name, processor in inspect.getmembers(
processors, inspect.ismodule
):
if not processor_name.startswith("_"):
parser.add_argument(
f"--output-{processor_name}",
help=(
"Processed metro systems output filename "
f"in {processor_name.upper()} format"
),
)
parser.add_argument("--cache", help="Cache file name for processed data")
parser.add_argument(
"-r", "--recovery-path", help="Cache file name for error recovery"
)
parser.add_argument(
"-d", "--dump", help="Make a YAML file for a city data"
)
parser.add_argument(
"-j", "--geojson", help="Make a GeoJSON file for a city data"
)
parser.add_argument(
"--crude",
action="store_true",
help="Do not use OSM railway geometry for GeoJSON",
)
options = parser.parse_args()
if options.quiet:
log_level = logging.WARNING
else:
log_level = logging.INFO
logging.basicConfig(
level=log_level,
datefmt="%H:%M:%S",
format="%(asctime)s %(levelname)-7s %(message)s",
)
cities = prepare_cities(options.cities_info_url, options.overground)
if options.city:
cities = [
c
for c in cities
if c.name == options.city or c.country == options.city
]
if not cities:
logging.error("No cities to process")
sys.exit(2)
# Augment cities with recovery data
recovery_data = None
if options.recovery_path:
recovery_data = read_recovery_data(options.recovery_path)
for city in cities:
city.recovery_data = recovery_data.get(city.name, None)
logging.info("Read %s metro networks", len(cities))
# Reading cached json, loading XML or querying Overpass API
if options.source and os.path.exists(options.source):
logging.info("Reading %s", options.source)
with open(options.source, "r") as f:
osm = json.load(f)
if "elements" in osm:
osm = osm["elements"]
calculate_centers(osm)
elif options.xml:
logging.info("Reading %s", options.xml)
osm = load_xml(options.xml)
calculate_centers(osm)
if options.source:
with open(options.source, "w", encoding="utf-8") as f:
json.dump(osm, f)
else:
if len(cities) > 10:
logging.error(
"Would not download that many cities from Overpass API, "
"choose a smaller set"
)
sys.exit(3)
bboxes = [c.bbox for c in cities]
logging.info("Downloading data from Overpass API")
osm = multi_overpass(options.overground, options.overpass_api, bboxes)
calculate_centers(osm)
if options.source:
with open(options.source, "w", encoding="utf-8") as f:
json.dump(osm, f)
logging.info("Downloaded %s elements", len(osm))
logging.info("Sorting elements by city")
add_osm_elements_to_cities(osm, cities)
logging.info("Building routes for each city")
good_cities = validate_cities(cities)
logging.info("Finding transfer stations")
transfers = find_transfers(osm, good_cities)
good_city_names = set(c.name for c in good_cities)
logging.info(
"%s good cities: %s",
len(good_city_names),
", ".join(sorted(good_city_names)),
)
bad_city_names = set(c.name for c in cities) - good_city_names
logging.info(
"%s bad cities: %s",
len(bad_city_names),
", ".join(sorted(bad_city_names)),
)
if options.dump_city_list:
lines = sorted(
f"{city.name}, {city.country}"
f"{' ' + BAD_MARK if city.name in bad_city_names else ''}\n"
for city in cities
)
options.dump_city_list.writelines(lines)
if options.recovery_path:
write_recovery_data(options.recovery_path, recovery_data, cities)
if options.entrances:
json.dump(get_unused_subway_entrances_geojson(osm), options.entrances)
if options.dump:
if os.path.isdir(options.dump):
for c in cities:
with open(
os.path.join(options.dump, slugify(c.name) + ".yaml"),
"w",
encoding="utf-8",
) as f:
dump_yaml(c, f)
elif len(cities) == 1:
with open(options.dump, "w", encoding="utf-8") as f:
dump_yaml(cities[0], f)
else:
logging.error("Cannot dump %s cities at once", len(cities))
if options.geojson:
if os.path.isdir(options.geojson):
for c in cities:
with open(
os.path.join(
options.geojson, slugify(c.name) + ".geojson"
),
"w",
encoding="utf-8",
) as f:
json.dump(make_geojson(c, not options.crude), f)
elif len(cities) == 1:
with open(options.geojson, "w", encoding="utf-8") as f:
json.dump(make_geojson(cities[0], not options.crude), f)
else:
logging.error(
"Cannot make a geojson of %s cities at once", len(cities)
)
if options.log:
res = []
for c in cities:
v = c.get_validation_result()
v["slug"] = slugify(c.name)
res.append(v)
json.dump(res, options.log, indent=2, ensure_ascii=False)
for processor_name, processor in inspect.getmembers(
processors, inspect.ismodule
):
option_name = f"output_{processor_name}"
if not getattr(options, option_name, None):
continue
filename = getattr(options, option_name)
processor.process(cities, transfers, filename, options.cache)
if __name__ == "__main__":
main()

View file

@ -94,7 +94,7 @@ function check_poly() {
if [ -n "$("$PYTHON" -c "import shapely" 2>&1)" ]; then
"$PYTHON" -m pip install shapely==2.0.1
fi
"$PYTHON" "$SUBWAYS_PATH"/make_all_metro_poly.py \
"$PYTHON" "$SUBWAYS_REPO_PATH"/tools/make_poly/make_all_metro_poly.py \
${CITIES_INFO_URL:+--cities-info-url "$CITIES_INFO_URL"} > "$POLY"
fi
fi
@ -107,13 +107,15 @@ PYTHON=${PYTHON:-python3}
# This will fail if there is no python
"$PYTHON" --version > /dev/null
SUBWAYS_PATH="$(dirname "$0")/.."
if [ ! -f "$SUBWAYS_PATH/process_subways.py" ]; then
# "readlink -f" echoes canonicalized absolute path to a file/directory
SUBWAYS_REPO_PATH="$(readlink -f $(dirname "$0")/..)"
if [ ! -f "$SUBWAYS_REPO_PATH/scripts/process_subways.py" ]; then
echo "Please clone the subways repo to $SUBWAYS_PATH"
exit 2
fi
TMPDIR="${TMPDIR:-$SUBWAYS_PATH}"
TMPDIR="${TMPDIR:-$SUBWAYS_REPO_PATH}"
mkdir -p "$TMPDIR"
# Downloading the latest version of the subways script
@ -242,7 +244,7 @@ if [ -n "${DUMP-}" ]; then
fi
VALIDATION="$TMPDIR/validation.json"
"$PYTHON" "$SUBWAYS_PATH/process_subways.py" ${QUIET:+-q} \
"$PYTHON" "$SUBWAYS_REPO_PATH/scripts/process_subways.py" ${QUIET:+-q} \
-x "$FILTERED_DATA" -l "$VALIDATION" \
${CITIES_INFO_URL:+--cities-info-url "$CITIES_INFO_URL"} \
${MAPSME:+--output-mapsme "$MAPSME"} \
@ -262,13 +264,13 @@ fi
# Preparing HTML files
if [ -z "${HTML_DIR-}" ]; then
HTML_DIR="$SUBWAYS_PATH/html"
HTML_DIR="$SUBWAYS_REPO_PATH/html"
REMOVE_HTML=1
fi
mkdir -p $HTML_DIR
rm -f "$HTML_DIR"/*.html
"$PYTHON" "$SUBWAYS_PATH/validation_to_html.py" \
"$PYTHON" "$SUBWAYS_REPO_PATH/tools/v2h/validation_to_html.py" \
${CITIES_INFO_URL:+--cities-info-url "$CITIES_INFO_URL"} \
"$VALIDATION" "$HTML_DIR"

File diff suppressed because it is too large Load diff

92
subways/__init__.py Normal file
View file

@ -0,0 +1,92 @@
from .consts import (
ALL_MODES,
CONSTRUCTION_KEYS,
DEFAULT_MODES_RAPID,
DEFAULT_MODES_OVERGROUND,
DISPLACEMENT_TOLERANCE,
MAX_DISTANCE_STOP_TO_LINE,
MODES_OVERGROUND,
MODES_RAPID,
RAILWAY_TYPES,
)
from .css_colours import normalize_colour
from .geom_utils import (
angle_between,
distance,
distance_on_line,
find_segment,
is_near,
project_on_line,
)
from .osm_element import el_center, el_id
from .overpass import multi_overpass, overpass_request
from .subway_io import (
dump_yaml,
load_xml,
make_geojson,
read_recovery_data,
write_recovery_data,
)
from .types import (
CriticalValidationError,
IdT,
LonLat,
OsmElementT,
RailT,
TransferT,
TransfersT,
)
from .validation import (
add_osm_elements_to_cities,
BAD_MARK,
calculate_centers,
DEFAULT_CITIES_INFO_URL,
DEFAULT_SPREADSHEET_ID,
get_cities_info,
prepare_cities,
validate_cities,
)
__all__ = [
"ALL_MODES",
"CONSTRUCTION_KEYS",
"DEFAULT_MODES_RAPID",
"DEFAULT_MODES_OVERGROUND",
"DISPLACEMENT_TOLERANCE",
"MAX_DISTANCE_STOP_TO_LINE",
"MODES_OVERGROUND",
"MODES_RAPID",
"RAILWAY_TYPES",
"angle_between",
"distance",
"distance_on_line",
"find_segment",
"is_near",
"project_on_line",
"normalize_colour",
"el_center",
"el_id",
"overpass_request",
"multi_overpass",
"dump_yaml",
"load_xml",
"make_geojson",
"read_recovery_data",
"write_recovery_data",
"CriticalValidationError",
"IdT",
"LonLat",
"OsmElementT",
"RailT",
"TransferT",
"TransfersT",
"add_osm_elements_to_cities",
"BAD_MARK",
"calculate_centers",
"DEFAULT_CITIES_INFO_URL",
"DEFAULT_SPREADSHEET_ID",
"get_cities_info",
"prepare_cities",
"validate_cities",
]

26
subways/consts.py Normal file
View file

@ -0,0 +1,26 @@
MAX_DISTANCE_STOP_TO_LINE = 50 # in meters
# If an object was moved not too far compared to previous validator run,
# it is likely the same object
DISPLACEMENT_TOLERANCE = 300 # in meters
MODES_RAPID = {"subway", "light_rail", "monorail", "train"}
MODES_OVERGROUND = {"tram", "bus", "trolleybus", "aerialway", "ferry"}
DEFAULT_MODES_RAPID = {"subway", "light_rail"}
DEFAULT_MODES_OVERGROUND = {"tram"} # TODO: bus and trolleybus?
ALL_MODES = MODES_RAPID | MODES_OVERGROUND
RAILWAY_TYPES = {
"rail",
"light_rail",
"subway",
"narrow_gauge",
"funicular",
"monorail",
"tram",
}
CONSTRUCTION_KEYS = (
"construction",
"proposed",
"construction:railway",
"proposed:railway",
)

175
subways/geom_utils.py Normal file
View file

@ -0,0 +1,175 @@
import math
from subways.consts import MAX_DISTANCE_STOP_TO_LINE
from subways.types import LonLat, RailT
def distance(p1: LonLat, p2: LonLat) -> float:
if p1 is None or p2 is None:
raise Exception(
"One of arguments to distance({}, {}) is None".format(p1, p2)
)
dx = math.radians(p1[0] - p2[0]) * math.cos(
0.5 * math.radians(p1[1] + p2[1])
)
dy = math.radians(p1[1] - p2[1])
return 6378137 * math.sqrt(dx * dx + dy * dy)
def is_near(p1: LonLat, p2: LonLat) -> bool:
return (
p1[0] - 1e-8 <= p2[0] <= p1[0] + 1e-8
and p1[1] - 1e-8 <= p2[1] <= p1[1] + 1e-8
)
def project_on_segment(p: LonLat, p1: LonLat, p2: LonLat) -> float | None:
"""Given three points, return u - the position of projection of
point p onto segment p1p2 regarding point p1 and (p2-p1) direction vector
"""
dp = (p2[0] - p1[0], p2[1] - p1[1])
d2 = dp[0] * dp[0] + dp[1] * dp[1]
if d2 < 1e-14:
return None
u = ((p[0] - p1[0]) * dp[0] + (p[1] - p1[1]) * dp[1]) / d2
if not 0 <= u <= 1:
return None
return u
def project_on_line(p: LonLat, line: RailT) -> dict:
result = {
# In the first approximation, position on rails is the index of the
# closest vertex of line to the point p. Fractional value means that
# the projected point lies on a segment between two vertices.
# More than one value can occur if a route follows the same tracks
# more than once.
"positions_on_line": None,
"projected_point": None, # (lon, lat)
}
if len(line) < 2:
return result
d_min = MAX_DISTANCE_STOP_TO_LINE * 5
closest_to_vertex = False
# First, check vertices in the line
for i, vertex in enumerate(line):
d = distance(p, vertex)
if d < d_min:
result["positions_on_line"] = [i]
result["projected_point"] = vertex
d_min = d
closest_to_vertex = True
elif vertex == result["projected_point"]:
# Repeated occurrence of the track vertex in line, like Oslo Line 5
result["positions_on_line"].append(i)
# And then calculate distances to each segment
for seg in range(len(line) - 1):
# Check bbox for speed
if not (
(
min(line[seg][0], line[seg + 1][0]) - MAX_DISTANCE_STOP_TO_LINE
<= p[0]
<= max(line[seg][0], line[seg + 1][0])
+ MAX_DISTANCE_STOP_TO_LINE
)
and (
min(line[seg][1], line[seg + 1][1]) - MAX_DISTANCE_STOP_TO_LINE
<= p[1]
<= max(line[seg][1], line[seg + 1][1])
+ MAX_DISTANCE_STOP_TO_LINE
)
):
continue
u = project_on_segment(p, line[seg], line[seg + 1])
if u:
projected_point = (
line[seg][0] + u * (line[seg + 1][0] - line[seg][0]),
line[seg][1] + u * (line[seg + 1][1] - line[seg][1]),
)
d = distance(p, projected_point)
if d < d_min:
result["positions_on_line"] = [seg + u]
result["projected_point"] = projected_point
d_min = d
closest_to_vertex = False
elif projected_point == result["projected_point"]:
# Repeated occurrence of the track segment in line,
# like Oslo Line 5
if not closest_to_vertex:
result["positions_on_line"].append(seg + u)
return result
def find_segment(
p: LonLat, line: RailT, start_vertex: int = 0
) -> tuple[int, float] | tuple[None, None]:
"""Returns index of a segment and a position inside it."""
EPS = 1e-9
for seg in range(start_vertex, len(line) - 1):
if is_near(p, line[seg]):
return seg, 0.0
if line[seg][0] == line[seg + 1][0]:
if not (p[0] - EPS <= line[seg][0] <= p[0] + EPS):
continue
px = None
else:
px = (p[0] - line[seg][0]) / (line[seg + 1][0] - line[seg][0])
if px is None or (0 <= px <= 1):
if line[seg][1] == line[seg + 1][1]:
if not (p[1] - EPS <= line[seg][1] <= p[1] + EPS):
continue
py = None
else:
py = (p[1] - line[seg][1]) / (line[seg + 1][1] - line[seg][1])
if py is None or (0 <= py <= 1):
if py is None or px is None or (px - EPS <= py <= px + EPS):
return seg, px or py
return None, None
def distance_on_line(
p1: LonLat, p2: LonLat, line: RailT, start_vertex: int = 0
) -> tuple[float, int] | None:
"""Calculates distance via line between projections
of points p1 and p2. Returns a TUPLE of (d, vertex):
d is the distance and vertex is the number of the second
vertex, to continue calculations for the next point."""
line_len = len(line)
seg1, pos1 = find_segment(p1, line, start_vertex)
if seg1 is None:
# logging.warn('p1 %s is not projected, st=%s', p1, start_vertex)
return None
seg2, pos2 = find_segment(p2, line, seg1)
if seg2 is None:
if line[0] == line[-1]:
line = line + line[1:]
seg2, pos2 = find_segment(p2, line, seg1)
if seg2 is None:
# logging.warn('p2 %s is not projected, st=%s', p2, start_vertex)
return None
if seg1 == seg2:
return distance(line[seg1], line[seg1 + 1]) * abs(pos2 - pos1), seg1
if seg2 < seg1:
# Should not happen
raise Exception("Pos1 %s is after pos2 %s", seg1, seg2)
d = 0
if pos1 < 1:
d += distance(line[seg1], line[seg1 + 1]) * (1 - pos1)
for i in range(seg1 + 1, seg2):
d += distance(line[i], line[i + 1])
if pos2 > 0:
d += distance(line[seg2], line[seg2 + 1]) * pos2
return d, seg2 % line_len
def angle_between(p1: LonLat, c: LonLat, p2: LonLat) -> float:
a = round(
abs(
math.degrees(
math.atan2(p1[1] - c[1], p1[0] - c[0])
- math.atan2(p2[1] - c[1], p2[0] - c[0])
)
)
)
return a if a <= 180 else 360 - a

19
subways/osm_element.py Normal file
View file

@ -0,0 +1,19 @@
from subways.types import IdT, LonLat, OsmElementT
def el_id(el: OsmElementT) -> IdT | None:
if not el:
return None
if "type" not in el:
raise Exception("What is this element? {}".format(el))
return el["type"][0] + str(el.get("id", el.get("ref", "")))
def el_center(el: OsmElementT) -> LonLat | None:
if not el:
return None
if "lat" in el:
return el["lon"], el["lat"]
elif "center" in el:
return el["center"]["lon"], el["center"]["lat"]
return None

60
subways/overpass.py Normal file
View file

@ -0,0 +1,60 @@
import json
import logging
import time
import urllib.parse
import urllib.request
from subways.consts import MODES_OVERGROUND, MODES_RAPID
from subways.types import OsmElementT
def compose_overpass_request(
overground: bool, bboxes: list[list[float]]
) -> str:
if not bboxes:
raise RuntimeError("No bboxes given for overpass request")
query = "[out:json][timeout:1000];("
modes = MODES_OVERGROUND if overground else MODES_RAPID
for bbox in bboxes:
bbox_part = f"({','.join(str(coord) for coord in bbox)})"
query += "("
for mode in sorted(modes):
query += f'rel[route="{mode}"]{bbox_part};'
query += ");"
query += "rel(br)[type=route_master];"
if not overground:
query += f"node[railway=subway_entrance]{bbox_part};"
query += f"node[railway=train_station_entrance]{bbox_part};"
query += f"rel[public_transport=stop_area]{bbox_part};"
query += (
"rel(br)[type=public_transport][public_transport=stop_area_group];"
)
query += ");(._;>>;);out body center qt;"
logging.debug("Query: %s", query)
return query
def overpass_request(
overground: bool, overpass_api: str, bboxes: list[list[float]]
) -> list[OsmElementT]:
query = compose_overpass_request(overground, bboxes)
url = f"{overpass_api}?data={urllib.parse.quote(query)}"
response = urllib.request.urlopen(url, timeout=1000)
if (r_code := response.getcode()) != 200:
raise Exception(f"Failed to query Overpass API: HTTP {r_code}")
return json.load(response)["elements"]
def multi_overpass(
overground: bool, overpass_api: str, bboxes: list[list[float]]
) -> list[OsmElementT]:
SLICE_SIZE = 10
INTERREQUEST_WAIT = 5 # in seconds
result = []
for i in range(0, len(bboxes), SLICE_SIZE):
if i > 0:
time.sleep(INTERREQUEST_WAIT)
bboxes_i = bboxes[i : i + SLICE_SIZE] # noqa E203
result.extend(overpass_request(overground, overpass_api, bboxes_i))
return result

View file

@ -1,4 +1,8 @@
# Import only those processors (modules) you want to use.
# Ignore F401 "module imported but unused" violation since these modules
# are addressed via introspection.
from . import mapsme, gtfs # noqa F401
from . import gtfs, mapsme # noqa F401
from ._common import transit_to_dict
__all__ = ["gtfs", "mapsme", "transit_to_dict"]

View file

@ -1,4 +1,12 @@
from subway_structure import City, el_center, TransfersT
from __future__ import annotations
import typing
from subways.osm_element import el_center
from subways.types import TransfersT
if typing.TYPE_CHECKING:
from subways.structure.city import City
DEFAULT_INTERVAL = 2.5 * 60 # seconds
KMPH_TO_MPS = 1 / 3.6 # km/h to m/s conversion multiplier

View file

@ -1,4 +1,7 @@
from __future__ import annotations
import csv
import typing
from functools import partial
from io import BytesIO, StringIO
from itertools import permutations
@ -12,11 +15,11 @@ from ._common import (
TRANSFER_PENALTY,
transit_to_dict,
)
from subway_structure import (
City,
distance,
TransfersT,
)
from subways.types import TransfersT
from subways.geom_utils import distance
if typing.TYPE_CHECKING:
from subways.structure.city import City
DEFAULT_TRIP_START_TIME = (5, 0) # 05:00

View file

@ -1,22 +1,18 @@
from __future__ import annotations
import json
import logging
import os
import typing
from collections import defaultdict
from collections.abc import Callable
from typing import Any, TypeAlias
from subway_structure import (
City,
DISPLACEMENT_TOLERANCE,
distance,
el_center,
IdT,
LonLat,
OsmElementT,
Station,
StopArea,
TransfersT,
)
from subways.consts import DISPLACEMENT_TOLERANCE
from subways.geom_utils import distance
from subways.osm_element import el_center
from subways.structure.station import Station
from subways.types import IdT, LonLat, OsmElementT, TransfersT
from ._common import (
DEFAULT_INTERVAL,
format_colour,
@ -25,6 +21,11 @@ from ._common import (
TRANSFER_PENALTY,
)
if typing.TYPE_CHECKING:
from subways.structure.city import City
from subways.structure.stop_area import StopArea
OSM_TYPES = {"n": (0, "node"), "w": (2, "way"), "r": (3, "relation")}
ENTRANCE_PENALTY = 60 # seconds
SPEED_TO_ENTRANCE = 5 * KMPH_TO_MPS # m/s

View file

@ -0,0 +1,17 @@
from .city import City, get_unused_subway_entrances_geojson
from .route import Route
from .route_master import RouteMaster
from .route_stop import RouteStop
from .station import Station
from .stop_area import StopArea
__all__ = [
"City",
"get_unused_subway_entrances_geojson",
"Route",
"RouteMaster",
"RouteStop",
"Station",
"StopArea",
]

626
subways/structure/city.py Normal file
View file

@ -0,0 +1,626 @@
from __future__ import annotations
from collections import Counter, defaultdict
from collections.abc import Collection, Iterator
from itertools import chain
from subways.consts import (
DEFAULT_MODES_OVERGROUND,
DEFAULT_MODES_RAPID,
)
from subways.osm_element import el_center, el_id
from subways.structure.route import Route
from subways.structure.route_master import RouteMaster
from subways.structure.station import Station
from subways.structure.stop_area import StopArea
from subways.types import (
IdT,
OsmElementT,
TransfersT,
TransferT,
)
ALLOWED_STATIONS_MISMATCH = 0.02 # part of total station count
ALLOWED_TRANSFERS_MISMATCH = 0.07 # part of total interchanges count
used_entrances = set()
def format_elid_list(ids: Collection[IdT]) -> str:
msg = ", ".join(sorted(ids)[:20])
if len(ids) > 20:
msg += ", ..."
return msg
class City:
route_class = Route
def __init__(self, city_data: dict, overground: bool = False) -> None:
self.validate_called = False
self.errors: list[str] = []
self.warnings: list[str] = []
self.notices: list[str] = []
self.id = None
self.try_fill_int_attribute(city_data, "id")
self.name = city_data["name"]
self.country = city_data["country"]
self.continent = city_data["continent"]
self.overground = overground
if not overground:
self.try_fill_int_attribute(city_data, "num_stations")
self.try_fill_int_attribute(city_data, "num_lines", "0")
self.try_fill_int_attribute(city_data, "num_light_lines", "0")
self.try_fill_int_attribute(city_data, "num_interchanges", "0")
else:
self.try_fill_int_attribute(city_data, "num_tram_lines", "0")
self.try_fill_int_attribute(city_data, "num_trolleybus_lines", "0")
self.try_fill_int_attribute(city_data, "num_bus_lines", "0")
self.try_fill_int_attribute(city_data, "num_other_lines", "0")
# Acquiring list of networks and modes
networks = (
None
if not city_data["networks"]
else city_data["networks"].split(":")
)
if not networks or len(networks[-1]) == 0:
self.networks = []
else:
self.networks = set(
filter(None, [x.strip() for x in networks[-1].split(";")])
)
if not networks or len(networks) < 2 or len(networks[0]) == 0:
if self.overground:
self.modes = DEFAULT_MODES_OVERGROUND
else:
self.modes = DEFAULT_MODES_RAPID
else:
self.modes = {x.strip() for x in networks[0].split(",")}
# Reversing bbox so it is (xmin, ymin, xmax, ymax)
bbox = city_data["bbox"].split(",")
if len(bbox) == 4:
self.bbox = [float(bbox[i]) for i in (1, 0, 3, 2)]
else:
self.bbox = None
self.elements: dict[IdT, OsmElementT] = {}
self.stations: dict[IdT, list[StopArea]] = defaultdict(list)
self.routes: dict[str, RouteMaster] = {} # keys are route_master refs
self.masters: dict[IdT, OsmElementT] = {} # Route id → master element
self.stop_areas: [IdT, list[OsmElementT]] = defaultdict(list)
self.transfers: list[set[StopArea]] = []
self.station_ids: set[IdT] = set()
self.stops_and_platforms: set[IdT] = set()
self.recovery_data = None
def try_fill_int_attribute(
self, city_data: dict, attr: str, default: str | None = None
) -> None:
"""Try to convert string value to int. Conversion is considered
to fail if one of the following is true:
* attr is not empty and data type casting fails;
* attr is empty and no default value is given.
In such cases the city is marked as bad by adding an error
to the city validation log.
"""
attr_value = city_data[attr]
if not attr_value and default is not None:
attr_value = default
try:
attr_int = int(attr_value)
except ValueError:
print_value = (
f"{city_data[attr]}" if city_data[attr] else "<empty>"
)
self.error(
f"Configuration error: wrong value for {attr}: {print_value}"
)
setattr(self, attr, 0)
else:
setattr(self, attr, attr_int)
@staticmethod
def log_message(message: str, el: OsmElementT) -> str:
if el:
tags = el.get("tags", {})
message += ' ({} {}, "{}")'.format(
el["type"],
el.get("id", el.get("ref")),
tags.get("name", tags.get("ref", "")),
)
return message
def notice(self, message: str, el: OsmElementT | None = None) -> None:
"""This type of message may point to a potential problem."""
msg = City.log_message(message, el)
self.notices.append(msg)
def warn(self, message: str, el: OsmElementT | None = None) -> None:
"""A warning is definitely a problem but is doesn't prevent
from building a routing file and doesn't invalidate the city.
"""
msg = City.log_message(message, el)
self.warnings.append(msg)
def error(self, message: str, el: OsmElementT | None = None) -> None:
"""Error is a critical problem that invalidates the city."""
msg = City.log_message(message, el)
self.errors.append(msg)
def contains(self, el: OsmElementT) -> bool:
center = el_center(el)
if center:
return (
self.bbox[0] <= center[1] <= self.bbox[2]
and self.bbox[1] <= center[0] <= self.bbox[3]
)
return False
def add(self, el: OsmElementT) -> None:
if el["type"] == "relation" and "members" not in el:
return
self.elements[el_id(el)] = el
if not (el["type"] == "relation" and "tags" in el):
return
relation_type = el["tags"].get("type")
if relation_type == "route_master":
for m in el["members"]:
if m["type"] != "relation":
continue
if el_id(m) in self.masters:
self.error("Route in two route_masters", m)
self.masters[el_id(m)] = el
elif el["tags"].get("public_transport") == "stop_area":
if relation_type != "public_transport":
self.warn(
"stop_area relation with "
f"type={relation_type}, needed type=public_transport",
el,
)
return
warned_about_duplicates = False
for m in el["members"]:
stop_areas = self.stop_areas[el_id(m)]
if el in stop_areas and not warned_about_duplicates:
self.warn("Duplicate element in a stop area", el)
warned_about_duplicates = True
else:
stop_areas.append(el)
def make_transfer(self, stoparea_group: OsmElementT) -> None:
transfer: set[StopArea] = set()
for m in stoparea_group["members"]:
k = el_id(m)
el = self.elements.get(k)
if not el:
# A stoparea_group member may validly not belong to the city
# while the stoparea_group does - near the city bbox boundary
continue
if "tags" not in el:
self.warn(
"An untagged object {} in a stop_area_group".format(k),
stoparea_group,
)
continue
if (
el["type"] != "relation"
or el["tags"].get("type") != "public_transport"
or el["tags"].get("public_transport") != "stop_area"
):
continue
if k in self.stations:
stoparea = self.stations[k][0]
transfer.add(stoparea)
if stoparea.transfer:
# TODO: properly process such cases.
# Counterexample 1: Paris,
# Châtelet subway station <->
# "Châtelet - Les Halles" railway station <->
# Les Halles subway station
# Counterexample 2: Saint-Petersburg, transfers
# Витебский вокзал <->
# Пушкинская <->
# Звенигородская
self.warn(
"Stop area {} belongs to multiple interchanges".format(
k
)
)
stoparea.transfer = el_id(stoparea_group)
if len(transfer) > 1:
self.transfers.append(transfer)
def extract_routes(self) -> None:
# Extract stations
processed_stop_areas = set()
for el in self.elements.values():
if Station.is_station(el, self.modes):
# See PR https://github.com/mapsme/subways/pull/98
if (
el["type"] == "relation"
and el["tags"].get("type") != "multipolygon"
):
rel_type = el["tags"].get("type")
self.warn(
"A railway station cannot be a relation of type "
f"{rel_type}",
el,
)
continue
st = Station(el, self)
self.station_ids.add(st.id)
if st.id in self.stop_areas:
stations = []
for sa in self.stop_areas[st.id]:
stations.append(StopArea(st, self, sa))
else:
stations = [StopArea(st, self)]
for station in stations:
if station.id not in processed_stop_areas:
processed_stop_areas.add(station.id)
for st_el in station.get_elements():
self.stations[st_el].append(station)
# Check that stops and platforms belong to
# a single stop_area
for sp in chain(station.stops, station.platforms):
if sp in self.stops_and_platforms:
self.notice(
f"A stop or a platform {sp} belongs to "
"multiple stop areas, might be correct"
)
else:
self.stops_and_platforms.add(sp)
# Extract routes
for el in self.elements.values():
if Route.is_route(el, self.modes):
if el["tags"].get("access") in ("no", "private"):
continue
route_id = el_id(el)
master = self.masters.get(route_id, None)
if self.networks:
network = Route.get_network(el)
if master:
master_network = Route.get_network(master)
else:
master_network = None
if (
network not in self.networks
and master_network not in self.networks
):
continue
route = self.route_class(el, self, master)
if not route.stops:
self.warn("Route has no stops", el)
continue
elif len(route.stops) == 1:
self.warn("Route has only one stop", el)
continue
k = el_id(master) if master else route.ref
if k not in self.routes:
self.routes[k] = RouteMaster(self, master)
self.routes[k].add(route)
# Sometimes adding a route to a newly initialized RouteMaster
# can fail
if len(self.routes[k]) == 0:
del self.routes[k]
# And while we're iterating over relations, find interchanges
if (
el["type"] == "relation"
and el.get("tags", {}).get("public_transport", None)
== "stop_area_group"
):
self.make_transfer(el)
# Filter transfers, leaving only stations that belong to routes
own_stopareas = set(self.stopareas())
self.transfers = [
inner_transfer
for inner_transfer in (
own_stopareas.intersection(transfer)
for transfer in self.transfers
)
if len(inner_transfer) > 1
]
def __iter__(self) -> Iterator[RouteMaster]:
return iter(self.routes.values())
def stopareas(self) -> Iterator[StopArea]:
yielded_stopareas = set()
for route_master in self:
for stoparea in route_master.stopareas():
if stoparea not in yielded_stopareas:
yield stoparea
yielded_stopareas.add(stoparea)
@property
def is_good(self) -> bool:
if not (self.errors or self.validate_called):
raise RuntimeError(
"You mustn't refer to City.is_good property before calling "
"the City.validate() method unless an error already occurred."
)
return len(self.errors) == 0
def get_validation_result(self) -> dict:
result = {
"name": self.name,
"country": self.country,
"continent": self.continent,
"stations_found": getattr(self, "found_stations", 0),
"transfers_found": getattr(self, "found_interchanges", 0),
"unused_entrances": getattr(self, "unused_entrances", 0),
"networks": getattr(self, "found_networks", 0),
}
if not self.overground:
result.update(
{
"subwayl_expected": getattr(self, "num_lines", 0),
"lightrl_expected": getattr(self, "num_light_lines", 0),
"subwayl_found": getattr(self, "found_lines", 0),
"lightrl_found": getattr(self, "found_light_lines", 0),
"stations_expected": getattr(self, "num_stations", 0),
"transfers_expected": getattr(self, "num_interchanges", 0),
}
)
else:
result.update(
{
"stations_expected": 0,
"transfers_expected": 0,
"busl_expected": getattr(self, "num_bus_lines", 0),
"trolleybusl_expected": getattr(
self, "num_trolleybus_lines", 0
),
"traml_expected": getattr(self, "num_tram_lines", 0),
"otherl_expected": getattr(self, "num_other_lines", 0),
"busl_found": getattr(self, "found_bus_lines", 0),
"trolleybusl_found": getattr(
self, "found_trolleybus_lines", 0
),
"traml_found": getattr(self, "found_tram_lines", 0),
"otherl_found": getattr(self, "found_other_lines", 0),
}
)
result["warnings"] = self.warnings
result["errors"] = self.errors
result["notices"] = self.notices
return result
def count_unused_entrances(self) -> None:
global used_entrances
stop_areas = set()
for el in self.elements.values():
if (
el["type"] == "relation"
and "tags" in el
and el["tags"].get("public_transport") == "stop_area"
and "members" in el
):
stop_areas.update([el_id(m) for m in el["members"]])
unused = []
not_in_sa = []
for el in self.elements.values():
if (
el["type"] == "node"
and "tags" in el
and el["tags"].get("railway") == "subway_entrance"
):
i = el_id(el)
if i in self.stations:
used_entrances.add(i)
if i not in stop_areas:
not_in_sa.append(i)
if i not in self.stations:
unused.append(i)
self.unused_entrances = len(unused)
self.entrances_not_in_stop_areas = len(not_in_sa)
if unused:
self.notice(
f"{len(unused)} subway entrances are not connected to a "
f"station: {format_elid_list(unused)}"
)
if not_in_sa:
self.notice(
f"{len(not_in_sa)} subway entrances are not in stop_area "
f"relations: {format_elid_list(not_in_sa)}"
)
def validate_lines(self) -> None:
self.found_light_lines = len(
[x for x in self.routes.values() if x.mode != "subway"]
)
self.found_lines = len(self.routes) - self.found_light_lines
if self.found_lines != self.num_lines:
self.error(
"Found {} subway lines, expected {}".format(
self.found_lines, self.num_lines
)
)
if self.found_light_lines != self.num_light_lines:
self.error(
"Found {} light rail lines, expected {}".format(
self.found_light_lines, self.num_light_lines
)
)
def validate_overground_lines(self) -> None:
self.found_tram_lines = len(
[x for x in self.routes.values() if x.mode == "tram"]
)
self.found_bus_lines = len(
[x for x in self.routes.values() if x.mode == "bus"]
)
self.found_trolleybus_lines = len(
[x for x in self.routes.values() if x.mode == "trolleybus"]
)
self.found_other_lines = len(
[
x
for x in self.routes.values()
if x.mode not in ("bus", "trolleybus", "tram")
]
)
if self.found_tram_lines != self.num_tram_lines:
log_function = (
self.error if self.found_tram_lines == 0 else self.notice
)
log_function(
"Found {} tram lines, expected {}".format(
self.found_tram_lines, self.num_tram_lines
),
)
def validate(self) -> None:
networks = Counter()
self.found_stations = 0
unused_stations = set(self.station_ids)
for rmaster in self.routes.values():
networks[str(rmaster.network)] += 1
if not self.overground:
rmaster.check_return_routes()
route_stations = set()
for sa in rmaster.stopareas():
route_stations.add(sa.transfer or sa.id)
unused_stations.discard(sa.station.id)
self.found_stations += len(route_stations)
if unused_stations:
self.unused_stations = len(unused_stations)
self.notice(
"{} unused stations: {}".format(
self.unused_stations, format_elid_list(unused_stations)
)
)
self.count_unused_entrances()
self.found_interchanges = len(self.transfers)
if self.overground:
self.validate_overground_lines()
else:
self.validate_lines()
if self.found_stations != self.num_stations:
msg = "Found {} stations in routes, expected {}".format(
self.found_stations, self.num_stations
)
log_function = (
self.error
if self.num_stations > 0
and not (
0
<= (self.num_stations - self.found_stations)
/ self.num_stations
<= ALLOWED_STATIONS_MISMATCH
)
else self.warn
)
log_function(msg)
if self.found_interchanges != self.num_interchanges:
msg = "Found {} interchanges, expected {}".format(
self.found_interchanges, self.num_interchanges
)
log_function = (
self.error
if self.num_interchanges != 0
and not (
(self.num_interchanges - self.found_interchanges)
/ self.num_interchanges
<= ALLOWED_TRANSFERS_MISMATCH
)
else self.warn
)
log_function(msg)
self.found_networks = len(networks)
if len(networks) > max(1, len(self.networks)):
n_str = "; ".join(
["{} ({})".format(k, v) for k, v in networks.items()]
)
self.notice("More than one network: {}".format(n_str))
self.validate_called = True
def calculate_distances(self) -> None:
for route_master in self:
for route in route_master:
route.calculate_distances()
def find_transfers(
elements: list[OsmElementT], cities: Collection[City]
) -> TransfersT:
"""As for now, two Cities may contain the same stoparea, but those
StopArea instances would have different python id. So we don't store
references to StopAreas, but only their ids. This is important at
inter-city interchanges.
"""
stop_area_groups = [
el
for el in elements
if el["type"] == "relation"
and "members" in el
and el.get("tags", {}).get("public_transport") == "stop_area_group"
]
stopareas_in_cities_ids = set(
stoparea.id
for city in cities
if city.is_good
for stoparea in city.stopareas()
)
transfers = []
for stop_area_group in stop_area_groups:
transfer: TransferT = set(
member_id
for member_id in (
el_id(member) for member in stop_area_group["members"]
)
if member_id in stopareas_in_cities_ids
)
if len(transfer) > 1:
transfers.append(transfer)
return transfers
def get_unused_subway_entrances_geojson(elements: list[OsmElementT]) -> dict:
global used_entrances
features = []
for el in elements:
if (
el["type"] == "node"
and "tags" in el
and el["tags"].get("railway") == "subway_entrance"
):
if el_id(el) not in used_entrances:
geometry = {"type": "Point", "coordinates": el_center(el)}
properties = {
k: v
for k, v in el["tags"].items()
if k not in ("railway", "entrance")
}
features.append(
{
"type": "Feature",
"geometry": geometry,
"properties": properties,
}
)
return {"type": "FeatureCollection", "features": features}

903
subways/structure/route.py Normal file
View file

@ -0,0 +1,903 @@
from __future__ import annotations
import re
import typing
from collections.abc import Callable, Iterator
from itertools import islice
from subways.consts import (
CONSTRUCTION_KEYS,
DISPLACEMENT_TOLERANCE,
MAX_DISTANCE_STOP_TO_LINE,
)
from subways.css_colours import normalize_colour
from subways.geom_utils import (
angle_between,
distance,
distance_on_line,
find_segment,
project_on_line,
)
from subways.osm_element import el_id, el_center
from subways.structure.route_stop import RouteStop
from subways.structure.station import Station
from subways.structure.stop_area import StopArea
from subways.types import CriticalValidationError, IdT, OsmElementT, RailT
if typing.TYPE_CHECKING:
from subways.structure.city import City
START_END_TIMES_RE = re.compile(r".*?(\d{2}):(\d{2})-(\d{2}):(\d{2}).*")
ALLOWED_ANGLE_BETWEEN_STOPS = 45 # in degrees
DISALLOWED_ANGLE_BETWEEN_STOPS = 20 # in degrees
def get_start_end_times(
opening_hours: str,
) -> tuple[tuple[int, int], tuple[int, int]] | tuple[None, None]:
"""Very simplified method to parse OSM opening_hours tag.
We simply take the first HH:MM-HH:MM substring which is the most probable
opening hours interval for the most of the weekdays.
"""
start_time, end_time = None, None
m = START_END_TIMES_RE.match(opening_hours)
if m:
ints = tuple(map(int, m.groups()))
start_time = (ints[0], ints[1])
end_time = (ints[2], ints[3])
return start_time, end_time
def osm_interval_to_seconds(interval_str: str) -> int | None:
"""Convert to int an OSM value for 'interval'/'headway' tag
which may be in these formats:
HH:MM:SS,
HH:MM,
MM,
M
(https://wiki.openstreetmap.org/wiki/Key:interval#Format)
"""
hours, minutes, seconds = 0, 0, 0
semicolon_count = interval_str.count(":")
try:
if semicolon_count == 0:
minutes = int(interval_str)
elif semicolon_count == 1:
hours, minutes = map(int, interval_str.split(":"))
elif semicolon_count == 2:
hours, minutes, seconds = map(int, interval_str.split(":"))
else:
return None
except ValueError:
return None
return seconds + 60 * minutes + 60 * 60 * hours
class Route:
"""The longest route for a city with a unique ref."""
@staticmethod
def is_route(el: OsmElementT, modes: set[str]) -> bool:
if (
el["type"] != "relation"
or el.get("tags", {}).get("type") != "route"
):
return False
if "members" not in el:
return False
if el["tags"].get("route") not in modes:
return False
for k in CONSTRUCTION_KEYS:
if k in el["tags"]:
return False
if "ref" not in el["tags"] and "name" not in el["tags"]:
return False
return True
@staticmethod
def get_network(relation: OsmElementT) -> str | None:
for k in ("network:metro", "network", "operator"):
if k in relation["tags"]:
return relation["tags"][k]
return None
@staticmethod
def get_interval(tags: dict) -> int | None:
v = None
for k in ("interval", "headway"):
if k in tags:
v = tags[k]
break
else:
for kk in tags:
if kk.startswith(k + ":"):
v = tags[kk]
break
if not v:
return None
return osm_interval_to_seconds(v)
def stopareas(self) -> Iterator[StopArea]:
yielded_stopareas = set()
for route_stop in self:
stoparea = route_stop.stoparea
if stoparea not in yielded_stopareas:
yield stoparea
yielded_stopareas.add(stoparea)
def __init__(
self,
relation: OsmElementT,
city: City,
master: OsmElementT | None = None,
) -> None:
assert Route.is_route(
relation, city.modes
), f"The relation does not seem to be a route: {relation}"
self.city = city
self.element: OsmElementT = relation
self.id: IdT = el_id(relation)
self.ref = None
self.name = None
self.mode = None
self.colour = None
self.infill = None
self.network = None
self.interval = None
self.start_time = None
self.end_time = None
self.is_circular = False
self.stops: list[RouteStop] = []
# Would be a list of (lon, lat) for the longest stretch. Can be empty.
self.tracks = None
# Index of the first stop that is located on/near the self.tracks
self.first_stop_on_rails_index = None
# Index of the last stop that is located on/near the self.tracks
self.last_stop_on_rails_index = None
self.process_tags(master)
stop_position_elements = self.process_stop_members()
self.process_tracks(stop_position_elements)
def build_longest_line(self) -> tuple[list[IdT], set[IdT]]:
line_nodes: set[IdT] = set()
last_track: list[IdT] = []
track: list[IdT] = []
warned_about_holes = False
for m in self.element["members"]:
el = self.city.elements.get(el_id(m), None)
if not el or not StopArea.is_track(el):
continue
if "nodes" not in el or len(el["nodes"]) < 2:
self.city.error("Cannot find nodes in a railway", el)
continue
nodes: list[IdT] = ["n{}".format(n) for n in el["nodes"]]
if m["role"] == "backward":
nodes.reverse()
line_nodes.update(nodes)
if not track:
is_first = True
track.extend(nodes)
else:
new_segment = list(nodes) # copying
if new_segment[0] == track[-1]:
track.extend(new_segment[1:])
elif new_segment[-1] == track[-1]:
track.extend(reversed(new_segment[:-1]))
elif is_first and track[0] in (
new_segment[0],
new_segment[-1],
):
# We can reverse the track and try again
track.reverse()
if new_segment[0] == track[-1]:
track.extend(new_segment[1:])
else:
track.extend(reversed(new_segment[:-1]))
else:
# Store the track if it is long and clean it
if not warned_about_holes:
self.city.warn(
"Hole in route rails near node {}".format(
track[-1]
),
self.element,
)
warned_about_holes = True
if len(track) > len(last_track):
last_track = track
track = []
is_first = False
if len(track) > len(last_track):
last_track = track
# Remove duplicate points
last_track = [
last_track[i]
for i in range(0, len(last_track))
if i == 0 or last_track[i - 1] != last_track[i]
]
return last_track, line_nodes
def get_stop_projections(self) -> tuple[list[dict], Callable[[int], bool]]:
projected = [project_on_line(x.stop, self.tracks) for x in self.stops]
def stop_near_tracks_criterion(stop_index: int) -> bool:
return (
projected[stop_index]["projected_point"] is not None
and distance(
self.stops[stop_index].stop,
projected[stop_index]["projected_point"],
)
<= MAX_DISTANCE_STOP_TO_LINE
)
return projected, stop_near_tracks_criterion
def project_stops_on_line(self) -> dict:
projected, stop_near_tracks_criterion = self.get_stop_projections()
projected_stops_data = {
"first_stop_on_rails_index": None,
"last_stop_on_rails_index": None,
"stops_on_longest_line": [], # list [{'route_stop': RouteStop,
# 'coords': LonLat,
# 'positions_on_rails': [] }
}
first_index = 0
while first_index < len(self.stops) and not stop_near_tracks_criterion(
first_index
):
first_index += 1
projected_stops_data["first_stop_on_rails_index"] = first_index
last_index = len(self.stops) - 1
while last_index > projected_stops_data[
"first_stop_on_rails_index"
] and not stop_near_tracks_criterion(last_index):
last_index -= 1
projected_stops_data["last_stop_on_rails_index"] = last_index
for i, route_stop in enumerate(self.stops):
if not first_index <= i <= last_index:
continue
if projected[i]["projected_point"] is None:
self.city.error(
'Stop "{}" {} is nowhere near the tracks'.format(
route_stop.stoparea.name, route_stop.stop
),
self.element,
)
else:
stop_data = {
"route_stop": route_stop,
"coords": None,
"positions_on_rails": None,
}
projected_point = projected[i]["projected_point"]
# We've got two separate stations with a good stretch of
# railway tracks between them. Put these on tracks.
d = round(distance(route_stop.stop, projected_point))
if d > MAX_DISTANCE_STOP_TO_LINE:
self.city.notice(
'Stop "{}" {} is {} meters from the tracks'.format(
route_stop.stoparea.name, route_stop.stop, d
),
self.element,
)
else:
stop_data["coords"] = projected_point
stop_data["positions_on_rails"] = projected[i][
"positions_on_line"
]
projected_stops_data["stops_on_longest_line"].append(stop_data)
return projected_stops_data
def calculate_distances(self) -> None:
dist = 0
vertex = 0
for i, stop in enumerate(self.stops):
if i > 0:
direct = distance(stop.stop, self.stops[i - 1].stop)
d_line = None
if (
self.first_stop_on_rails_index
<= i
<= self.last_stop_on_rails_index
):
d_line = distance_on_line(
self.stops[i - 1].stop, stop.stop, self.tracks, vertex
)
if d_line and direct - 10 <= d_line[0] <= direct * 2:
vertex = d_line[1]
dist += round(d_line[0])
else:
dist += round(direct)
stop.distance = dist
def process_tags(self, master: OsmElementT) -> None:
relation = self.element
master_tags = {} if not master else master["tags"]
if "ref" not in relation["tags"] and "ref" not in master_tags:
self.city.notice("Missing ref on a route", relation)
self.ref = relation["tags"].get(
"ref", master_tags.get("ref", relation["tags"].get("name", None))
)
self.name = relation["tags"].get("name", None)
self.mode = relation["tags"]["route"]
if (
"colour" not in relation["tags"]
and "colour" not in master_tags
and self.mode != "tram"
):
self.city.notice("Missing colour on a route", relation)
try:
self.colour = normalize_colour(
relation["tags"].get("colour", master_tags.get("colour", None))
)
except ValueError as e:
self.colour = None
self.city.warn(str(e), relation)
try:
self.infill = normalize_colour(
relation["tags"].get(
"colour:infill", master_tags.get("colour:infill", None)
)
)
except ValueError as e:
self.infill = None
self.city.warn(str(e), relation)
self.network = Route.get_network(relation)
self.interval = Route.get_interval(
relation["tags"]
) or Route.get_interval(master_tags)
self.start_time, self.end_time = get_start_end_times(
relation["tags"].get(
"opening_hours", master_tags.get("opening_hours", "")
)
)
if relation["tags"].get("public_transport:version") == "1":
self.city.warn(
"Public transport version is 1, which means the route "
"is an unsorted pile of objects",
relation,
)
def process_stop_members(self) -> list[OsmElementT]:
stations: set[StopArea] = set() # temporary for recording stations
seen_stops = False
seen_platforms = False
repeat_pos = None
stop_position_elements: list[OsmElementT] = []
for m in self.element["members"]:
if "inactive" in m["role"]:
continue
k = el_id(m)
if k in self.city.stations:
st_list = self.city.stations[k]
st = st_list[0]
if len(st_list) > 1:
self.city.error(
f"Ambiguous station {st.name} in route. Please "
"use stop_position or split interchange stations",
self.element,
)
el = self.city.elements[k]
actual_role = RouteStop.get_actual_role(
el, m["role"], self.city.modes
)
if actual_role:
if m["role"] and actual_role not in m["role"]:
self.city.warn(
"Wrong role '{}' for {} {}".format(
m["role"], actual_role, k
),
self.element,
)
if repeat_pos is None:
if not self.stops or st not in stations:
stop = RouteStop(st)
self.stops.append(stop)
stations.add(st)
elif self.stops[-1].stoparea.id == st.id:
stop = self.stops[-1]
else:
# We've got a repeat
if (
(seen_stops and seen_platforms)
or (
actual_role == "stop"
and not seen_platforms
)
or (
actual_role == "platform"
and not seen_stops
)
):
# Circular route!
stop = RouteStop(st)
self.stops.append(stop)
stations.add(st)
else:
repeat_pos = 0
if repeat_pos is not None:
if repeat_pos >= len(self.stops):
continue
# Check that the type matches
if (actual_role == "stop" and seen_stops) or (
actual_role == "platform" and seen_platforms
):
self.city.error(
'Found an out-of-place {}: "{}" ({})'.format(
actual_role, el["tags"].get("name", ""), k
),
self.element,
)
continue
# Find the matching stop starting with index repeat_pos
while (
repeat_pos < len(self.stops)
and self.stops[repeat_pos].stoparea.id != st.id
):
repeat_pos += 1
if repeat_pos >= len(self.stops):
self.city.error(
"Incorrect order of {}s at {}".format(
actual_role, k
),
self.element,
)
continue
stop = self.stops[repeat_pos]
stop.add(m, self.element, self.city)
if repeat_pos is None:
seen_stops |= stop.seen_stop or stop.seen_station
seen_platforms |= stop.seen_platform
if StopArea.is_stop(el):
stop_position_elements.append(el)
continue
if k not in self.city.elements:
if "stop" in m["role"] or "platform" in m["role"]:
raise CriticalValidationError(
f"{m['role']} {m['type']} {m['ref']} for route "
f"relation {self.element['id']} is not in the dataset"
)
continue
el = self.city.elements[k]
if "tags" not in el:
self.city.error(
f"Untagged object {k} in a route", self.element
)
continue
is_under_construction = False
for ck in CONSTRUCTION_KEYS:
if ck in el["tags"]:
self.city.warn(
f"Under construction {m['role'] or 'feature'} {k} "
"in route. Consider setting 'inactive' role or "
"removing construction attributes",
self.element,
)
is_under_construction = True
break
if is_under_construction:
continue
if Station.is_station(el, self.city.modes):
# A station may be not included in this route due to previous
# 'stop area has multiple stations' error. No other error
# message is needed.
pass
elif el["tags"].get("railway") in ("station", "halt"):
self.city.error(
"Missing station={} on a {}".format(self.mode, m["role"]),
el,
)
else:
actual_role = RouteStop.get_actual_role(
el, m["role"], self.city.modes
)
if actual_role:
self.city.error(
f"{actual_role} {m['type']} {m['ref']} is not "
"connected to a station in route",
self.element,
)
elif not StopArea.is_track(el):
self.city.warn(
"Unknown member type for {} {} in route".format(
m["type"], m["ref"]
),
self.element,
)
return stop_position_elements
def process_tracks(
self, stop_position_elements: list[OsmElementT]
) -> None:
tracks, line_nodes = self.build_longest_line()
for stop_el in stop_position_elements:
stop_id = el_id(stop_el)
if stop_id not in line_nodes:
self.city.warn(
'Stop position "{}" ({}) is not on tracks'.format(
stop_el["tags"].get("name", ""), stop_id
),
self.element,
)
# self.tracks would be a list of (lon, lat) for the longest stretch.
# Can be empty.
self.tracks = [el_center(self.city.elements.get(k)) for k in tracks]
if (
None in self.tracks
): # usually, extending BBOX for the city is needed
self.tracks = []
for n in filter(lambda x: x not in self.city.elements, tracks):
self.city.warn(
f"The dataset is missing the railway tracks node {n}",
self.element,
)
break
if len(self.stops) > 1:
self.is_circular = (
self.stops[0].stoparea == self.stops[-1].stoparea
)
if (
self.is_circular
and self.tracks
and self.tracks[0] != self.tracks[-1]
):
self.city.warn(
"Non-closed rail sequence in a circular route",
self.element,
)
projected_stops_data = self.project_stops_on_line()
self.check_and_recover_stops_order(projected_stops_data)
self.apply_projected_stops_data(projected_stops_data)
def apply_projected_stops_data(self, projected_stops_data: dict) -> None:
"""Store better stop coordinates and indexes of first/last stops
that lie on a continuous track line, to the instance attributes.
"""
for attr in ("first_stop_on_rails_index", "last_stop_on_rails_index"):
setattr(self, attr, projected_stops_data[attr])
for stop_data in projected_stops_data["stops_on_longest_line"]:
route_stop = stop_data["route_stop"]
route_stop.positions_on_rails = stop_data["positions_on_rails"]
if stop_coords := stop_data["coords"]:
route_stop.stop = stop_coords
def get_extended_tracks(self) -> RailT:
"""Amend tracks with points of leading/trailing self.stops
that were not projected onto the longest tracks line.
Return a new array.
"""
if self.first_stop_on_rails_index >= len(self.stops):
tracks = [route_stop.stop for route_stop in self.stops]
else:
tracks = (
[
route_stop.stop
for i, route_stop in enumerate(self.stops)
if i < self.first_stop_on_rails_index
]
+ self.tracks
+ [
route_stop.stop
for i, route_stop in enumerate(self.stops)
if i > self.last_stop_on_rails_index
]
)
return tracks
def get_truncated_tracks(self, tracks: RailT) -> RailT:
"""Truncate leading/trailing segments of `tracks` param
that are beyond the first and last stop locations.
Return a new array.
"""
if self.is_circular:
return tracks.copy()
first_stop_location = find_segment(self.stops[0].stop, tracks, 0)
last_stop_location = find_segment(self.stops[-1].stop, tracks, 0)
if last_stop_location != (None, None):
seg2, u2 = last_stop_location
if u2 == 0.0:
# Make seg2 the segment the last_stop_location is
# at the middle or end of
seg2 -= 1
# u2 = 1.0
if seg2 + 2 < len(tracks):
tracks = tracks[0 : seg2 + 2] # noqa E203
tracks[-1] = self.stops[-1].stop
if first_stop_location != (None, None):
seg1, u1 = first_stop_location
if u1 == 1.0:
# Make seg1 the segment the first_stop_location is
# at the beginning or middle of
seg1 += 1
# u1 = 0.0
if seg1 > 0:
tracks = tracks[seg1:]
tracks[0] = self.stops[0].stop
return tracks
def are_tracks_complete(self) -> bool:
return (
self.first_stop_on_rails_index == 0
and self.last_stop_on_rails_index == len(self) - 1
)
def get_tracks_geometry(self) -> RailT:
tracks = self.get_extended_tracks()
tracks = self.get_truncated_tracks(tracks)
return tracks
def check_stops_order_by_angle(self) -> tuple[list[str], list[str]]:
disorder_warnings = []
disorder_errors = []
for i, route_stop in enumerate(
islice(self.stops, 1, len(self.stops) - 1), start=1
):
angle = angle_between(
self.stops[i - 1].stop,
route_stop.stop,
self.stops[i + 1].stop,
)
if angle < ALLOWED_ANGLE_BETWEEN_STOPS:
msg = (
"Angle between stops around "
f'"{route_stop.stoparea.name}" {route_stop.stop} '
f"is too narrow, {angle} degrees"
)
if angle < DISALLOWED_ANGLE_BETWEEN_STOPS:
disorder_errors.append(msg)
else:
disorder_warnings.append(msg)
return disorder_warnings, disorder_errors
def check_stops_order_on_tracks_direct(
self, stop_sequence: Iterator[dict]
) -> str | None:
"""Checks stops order on tracks, following stop_sequence
in direct order only.
:param stop_sequence: list of dict{'route_stop', 'positions_on_rails',
'coords'} for RouteStops that belong to the longest contiguous
sequence of tracks in a route.
:return: error message on the first order violation or None.
"""
allowed_order_violations = 1 if self.is_circular else 0
max_position_on_rails = -1
for stop_data in stop_sequence:
positions_on_rails = stop_data["positions_on_rails"]
suitable_occurrence = 0
while (
suitable_occurrence < len(positions_on_rails)
and positions_on_rails[suitable_occurrence]
< max_position_on_rails
):
suitable_occurrence += 1
if suitable_occurrence == len(positions_on_rails):
if allowed_order_violations > 0:
suitable_occurrence -= 1
allowed_order_violations -= 1
else:
route_stop = stop_data["route_stop"]
return (
"Stops on tracks are unordered near "
f'"{route_stop.stoparea.name}" {route_stop.stop}'
)
max_position_on_rails = positions_on_rails[suitable_occurrence]
def check_stops_order_on_tracks(
self, projected_stops_data: dict
) -> str | None:
"""Checks stops order on tracks, trying direct and reversed
order of stops in the stop_sequence.
:param projected_stops_data: info about RouteStops that belong to the
longest contiguous sequence of tracks in a route. May be changed
if tracks reversing is performed.
:return: error message on the first order violation or None.
"""
error_message = self.check_stops_order_on_tracks_direct(
projected_stops_data["stops_on_longest_line"]
)
if error_message:
error_message_reversed = self.check_stops_order_on_tracks_direct(
reversed(projected_stops_data["stops_on_longest_line"])
)
if error_message_reversed is None:
error_message = None
self.city.warn(
"Tracks seem to go in the opposite direction to stops",
self.element,
)
self.tracks.reverse()
new_projected_stops_data = self.project_stops_on_line()
projected_stops_data.update(new_projected_stops_data)
return error_message
def check_stops_order(
self, projected_stops_data: dict
) -> tuple[list[str], list[str]]:
(
angle_disorder_warnings,
angle_disorder_errors,
) = self.check_stops_order_by_angle()
disorder_on_tracks_error = self.check_stops_order_on_tracks(
projected_stops_data
)
disorder_warnings = angle_disorder_warnings
disorder_errors = angle_disorder_errors
if disorder_on_tracks_error:
disorder_errors.append(disorder_on_tracks_error)
return disorder_warnings, disorder_errors
def check_and_recover_stops_order(
self, projected_stops_data: dict
) -> None:
"""
:param projected_stops_data: may change if we need to reverse tracks
"""
disorder_warnings, disorder_errors = self.check_stops_order(
projected_stops_data
)
if disorder_warnings or disorder_errors:
resort_success = False
if self.city.recovery_data:
resort_success = self.try_resort_stops()
if resort_success:
for msg in disorder_warnings:
self.city.notice(msg, self.element)
for msg in disorder_errors:
self.city.warn(
"Fixed with recovery data: " + msg, self.element
)
if not resort_success:
for msg in disorder_warnings:
self.city.notice(msg, self.element)
for msg in disorder_errors:
self.city.error(msg, self.element)
def try_resort_stops(self) -> bool:
"""Precondition: self.city.recovery_data is not None.
Return success of station order recovering."""
self_stops = {} # station name => RouteStop
for stop in self.stops:
station = stop.stoparea.station
stop_name = station.name
if stop_name == "?" and station.int_name:
stop_name = station.int_name
# We won't programmatically recover routes with repeating stations:
# such cases are rare and deserves manual verification
if stop_name in self_stops:
return False
self_stops[stop_name] = stop
route_id = (self.colour, self.ref)
if route_id not in self.city.recovery_data:
return False
stop_names = list(self_stops.keys())
suitable_itineraries = []
for itinerary in self.city.recovery_data[route_id]:
itinerary_stop_names = [
stop["name"] for stop in itinerary["stations"]
]
if not (
len(stop_names) == len(itinerary_stop_names)
and sorted(stop_names) == sorted(itinerary_stop_names)
):
continue
big_station_displacement = False
for it_stop in itinerary["stations"]:
name = it_stop["name"]
it_stop_center = it_stop["center"]
self_stop_center = self_stops[name].stoparea.station.center
if (
distance(it_stop_center, self_stop_center)
> DISPLACEMENT_TOLERANCE
):
big_station_displacement = True
break
if not big_station_displacement:
suitable_itineraries.append(itinerary)
if len(suitable_itineraries) == 0:
return False
elif len(suitable_itineraries) == 1:
matching_itinerary = suitable_itineraries[0]
else:
from_tag = self.element["tags"].get("from")
to_tag = self.element["tags"].get("to")
if not from_tag and not to_tag:
return False
matching_itineraries = [
itin
for itin in suitable_itineraries
if from_tag
and itin["from"] == from_tag
or to_tag
and itin["to"] == to_tag
]
if len(matching_itineraries) != 1:
return False
matching_itinerary = matching_itineraries[0]
self.stops = [
self_stops[stop["name"]] for stop in matching_itinerary["stations"]
]
return True
def get_end_transfers(self) -> tuple[IdT, IdT]:
"""Using transfer ids because a train can arrive at different
stations within a transfer. But disregard transfer that may give
an impression of a circular route (for example,
Simonis / Elisabeth station and route 2 in Brussels).
"""
return (
(self[0].stoparea.id, self[-1].stoparea.id)
if (
self[0].stoparea.transfer is not None
and self[0].stoparea.transfer == self[-1].stoparea.transfer
)
else (
self[0].stoparea.transfer or self[0].stoparea.id,
self[-1].stoparea.transfer or self[-1].stoparea.id,
)
)
def get_transfers_sequence(self) -> list[IdT]:
"""Return a list of stoparea or transfer (if not None) ids."""
transfer_seq = [
stop.stoparea.transfer or stop.stoparea.id for stop in self
]
if (
self[0].stoparea.transfer is not None
and self[0].stoparea.transfer == self[-1].stoparea.transfer
):
transfer_seq[0], transfer_seq[-1] = self.get_end_transfers()
return transfer_seq
def __len__(self) -> int:
return len(self.stops)
def __getitem__(self, i) -> RouteStop:
return self.stops[i]
def __iter__(self) -> Iterator[RouteStop]:
return iter(self.stops)
def __repr__(self) -> str:
return (
"Route(id={}, mode={}, ref={}, name={}, network={}, interval={}, "
"circular={}, num_stops={}, line_length={} m, from={}, to={}"
).format(
self.id,
self.mode,
self.ref,
self.name,
self.network,
self.interval,
self.is_circular,
len(self.stops),
self.stops[-1].distance,
self.stops[0],
self.stops[-1],
)

View file

@ -0,0 +1,464 @@
from __future__ import annotations
import typing
from collections.abc import Iterator
from typing import TypeVar
from subways.consts import MAX_DISTANCE_STOP_TO_LINE
from subways.css_colours import normalize_colour
from subways.geom_utils import distance, project_on_line
from subways.osm_element import el_id
from subways.structure.route import Route
from subways.structure.stop_area import StopArea
from subways.types import IdT, OsmElementT
if typing.TYPE_CHECKING:
from subways.structure.city import City
from subways.structure.route_stop import RouteStop
SUGGEST_TRANSFER_MIN_DISTANCE = 100 # in meters
T = TypeVar("T")
class RouteMaster:
def __init__(self, city: City, master: OsmElementT = None) -> None:
self.city = city
self.routes = []
self.best: Route = None
self.id: IdT = el_id(master)
self.has_master = master is not None
self.interval_from_master = False
if master:
self.ref = master["tags"].get(
"ref", master["tags"].get("name", None)
)
try:
self.colour = normalize_colour(
master["tags"].get("colour", None)
)
except ValueError:
self.colour = None
try:
self.infill = normalize_colour(
master["tags"].get("colour:infill", None)
)
except ValueError:
self.infill = None
self.network = Route.get_network(master)
self.mode = master["tags"].get(
"route_master", None
) # This tag is required, but okay
self.name = master["tags"].get("name", None)
self.interval = Route.get_interval(master["tags"])
self.interval_from_master = self.interval is not None
else:
self.ref = None
self.colour = None
self.infill = None
self.network = None
self.mode = None
self.name = None
self.interval = None
def stopareas(self) -> Iterator[StopArea]:
yielded_stopareas = set()
for route in self:
for stoparea in route.stopareas():
if stoparea not in yielded_stopareas:
yield stoparea
yielded_stopareas.add(stoparea)
def add(self, route: Route) -> None:
if not self.network:
self.network = route.network
elif route.network and route.network != self.network:
self.city.error(
'Route has different network ("{}") from master "{}"'.format(
route.network, self.network
),
route.element,
)
if not self.colour:
self.colour = route.colour
elif route.colour and route.colour != self.colour:
self.city.notice(
'Route "{}" has different colour from master "{}"'.format(
route.colour, self.colour
),
route.element,
)
if not self.infill:
self.infill = route.infill
elif route.infill and route.infill != self.infill:
self.city.notice(
(
f'Route "{route.infill}" has different infill colour '
f'from master "{self.infill}"'
),
route.element,
)
if not self.ref:
self.ref = route.ref
elif route.ref != self.ref:
self.city.notice(
'Route "{}" has different ref from master "{}"'.format(
route.ref, self.ref
),
route.element,
)
if not self.name:
self.name = route.name
if not self.mode:
self.mode = route.mode
elif route.mode != self.mode:
self.city.error(
"Incompatible PT mode: master has {} and route has {}".format(
self.mode, route.mode
),
route.element,
)
return
if not self.interval_from_master and route.interval:
if not self.interval:
self.interval = route.interval
else:
self.interval = min(self.interval, route.interval)
# Choose minimal id for determinancy
if not self.has_master and (not self.id or self.id > route.id):
self.id = route.id
self.routes.append(route)
if (
not self.best
or len(route.stops) > len(self.best.stops)
or (
# Choose route with minimal id for determinancy
len(route.stops) == len(self.best.stops)
and route.element["id"] < self.best.element["id"]
)
):
self.best = route
def get_meaningful_routes(self) -> list[Route]:
return [route for route in self if len(route) >= 2]
def find_twin_routes(self) -> dict[Route, Route]:
"""Two non-circular routes are twins if they have the same end
stations and opposite directions, and the number of stations is
the same or almost the same. We'll then find stops that are present
in one direction and is missing in another direction - to warn.
"""
twin_routes = {} # route => "twin" route
for route in self.get_meaningful_routes():
if route.is_circular:
continue # Difficult to calculate. TODO(?) in the future
if route in twin_routes:
continue
route_transfer_ids = set(route.get_transfers_sequence())
ends = route.get_end_transfers()
ends_reversed = ends[::-1]
twin_candidates = [
r
for r in self
if not r.is_circular
and r not in twin_routes
and r.get_end_transfers() == ends_reversed
# If absolute or relative difference in station count is large,
# possibly it's an express version of a route - skip it.
and (
abs(len(r) - len(route)) <= 2
or abs(len(r) - len(route)) / max(len(r), len(route))
<= 0.2
)
]
if not twin_candidates:
continue
twin_route = min(
twin_candidates,
key=lambda r: len(
route_transfer_ids ^ set(r.get_transfers_sequence())
),
)
twin_routes[route] = twin_route
twin_routes[twin_route] = route
return twin_routes
def check_return_routes(self) -> None:
"""Check if a route has return direction, and if twin routes
miss stations.
"""
meaningful_routes = self.get_meaningful_routes()
if len(meaningful_routes) == 0:
self.city.error(
f"An empty route master {self.id}. "
"Please set construction:route if it is under construction"
)
elif len(meaningful_routes) == 1:
log_function = (
self.city.error
if not self.best.is_circular
else self.city.notice
)
log_function(
"Only one route in route_master. "
"Please check if it needs a return route",
self.best.element,
)
else:
self.check_return_circular_routes()
self.check_return_noncircular_routes()
def check_return_noncircular_routes(self) -> None:
routes = [
route
for route in self.get_meaningful_routes()
if not route.is_circular
]
all_ends = {route.get_end_transfers(): route for route in routes}
for route in routes:
ends = route.get_end_transfers()
if ends[::-1] not in all_ends:
self.city.notice(
"Route does not have a return direction", route.element
)
twin_routes = self.find_twin_routes()
for route1, route2 in twin_routes.items():
if route1.id > route2.id:
continue # to process a pair of routes only once
# and to ensure the order of routes in the pair
self.alert_twin_routes_differ(route1, route2)
def check_return_circular_routes(self) -> None:
routes = {
route
for route in self.get_meaningful_routes()
if route.is_circular
}
routes_having_backward = set()
for route in routes:
if route in routes_having_backward:
continue
transfer_sequence1 = [
stop.stoparea.transfer or stop.stoparea.id for stop in route
]
transfer_sequence1.pop()
for potential_backward_route in routes - {route}:
transfer_sequence2 = [
stop.stoparea.transfer or stop.stoparea.id
for stop in potential_backward_route
][
-2::-1
] # truncate repeated first stop and reverse
common_subsequence = self.find_common_circular_subsequence(
transfer_sequence1, transfer_sequence2
)
if len(common_subsequence) >= 0.8 * min(
len(transfer_sequence1), len(transfer_sequence2)
):
routes_having_backward.add(route)
routes_having_backward.add(potential_backward_route)
break
for route in routes - routes_having_backward:
self.city.notice(
"Route does not have a return direction", route.element
)
@staticmethod
def find_common_circular_subsequence(
seq1: list[T], seq2: list[T]
) -> list[T]:
"""seq1 and seq2 are supposed to be stops of some circular routes.
Prerequisites to rely on the result:
- elements of each sequence are not repeated
- the order of stations is not violated.
Under these conditions we don't need LCS algorithm. Linear scan is
sufficient.
"""
i1, i2 = -1, -1
for i1, x in enumerate(seq1):
try:
i2 = seq2.index(x)
except ValueError:
continue
else:
# x is found both in seq1 and seq2
break
if i2 == -1:
return []
# Shift cyclically so that the common element takes the first position
# both in seq1 and seq2
seq1 = seq1[i1:] + seq1[:i1]
seq2 = seq2[i2:] + seq2[:i2]
common_subsequence = []
i2 = 0
for x in seq1:
try:
i2 = seq2.index(x, i2)
except ValueError:
continue
common_subsequence.append(x)
i2 += 1
if i2 >= len(seq2):
break
return common_subsequence
def alert_twin_routes_differ(self, route1: Route, route2: Route) -> None:
"""Arguments are that route1.id < route2.id"""
(
stops_missing_from_route1,
stops_missing_from_route2,
stops_that_dont_match,
) = self.calculate_twin_routes_diff(route1, route2)
for st in stops_missing_from_route1:
if (
not route1.are_tracks_complete()
or (
projected_point := project_on_line(
st.stoparea.center, route1.tracks
)["projected_point"]
)
is not None
and distance(st.stoparea.center, projected_point)
<= MAX_DISTANCE_STOP_TO_LINE
):
self.city.notice(
f"Stop {st.stoparea.station.name} {st.stop} is included "
f"in the {route2.id} but not included in {route1.id}",
route1.element,
)
for st in stops_missing_from_route2:
if (
not route2.are_tracks_complete()
or (
projected_point := project_on_line(
st.stoparea.center, route2.tracks
)["projected_point"]
)
is not None
and distance(st.stoparea.center, projected_point)
<= MAX_DISTANCE_STOP_TO_LINE
):
self.city.notice(
f"Stop {st.stoparea.station.name} {st.stop} is included "
f"in the {route1.id} but not included in {route2.id}",
route2.element,
)
for st1, st2 in stops_that_dont_match:
if (
st1.stoparea.station == st2.stoparea.station
or distance(st1.stop, st2.stop) < SUGGEST_TRANSFER_MIN_DISTANCE
):
self.city.notice(
"Should there be one stoparea or a transfer between "
f"{st1.stoparea.station.name} {st1.stop} and "
f"{st2.stoparea.station.name} {st2.stop}?",
route1.element,
)
@staticmethod
def calculate_twin_routes_diff(route1: Route, route2: Route) -> tuple:
"""WagnerFischer algorithm for stops diff in two twin routes."""
stops1 = route1.stops
stops2 = route2.stops[::-1]
def stops_match(stop1: RouteStop, stop2: RouteStop) -> bool:
return (
stop1.stoparea == stop2.stoparea
or stop1.stoparea.transfer is not None
and stop1.stoparea.transfer == stop2.stoparea.transfer
)
d = [[0] * (len(stops2) + 1) for _ in range(len(stops1) + 1)]
d[0] = list(range(len(stops2) + 1))
for i in range(len(stops1) + 1):
d[i][0] = i
for i in range(1, len(stops1) + 1):
for j in range(1, len(stops2) + 1):
d[i][j] = (
d[i - 1][j - 1]
if stops_match(stops1[i - 1], stops2[j - 1])
else min((d[i - 1][j], d[i][j - 1], d[i - 1][j - 1])) + 1
)
stops_missing_from_route1: list[RouteStop] = []
stops_missing_from_route2: list[RouteStop] = []
stops_that_dont_match: list[tuple[RouteStop, RouteStop]] = []
i = len(stops1)
j = len(stops2)
while not (i == 0 and j == 0):
action = None
if i > 0 and j > 0:
match = stops_match(stops1[i - 1], stops2[j - 1])
if match and d[i - 1][j - 1] == d[i][j]:
action = "no"
elif not match and d[i - 1][j - 1] + 1 == d[i][j]:
action = "change"
if not action and i > 0 and d[i - 1][j] + 1 == d[i][j]:
action = "add_2"
if not action and j > 0 and d[i][j - 1] + 1 == d[i][j]:
action = "add_1"
match action:
case "add_1":
stops_missing_from_route1.append(stops2[j - 1])
j -= 1
case "add_2":
stops_missing_from_route2.append(stops1[i - 1])
i -= 1
case _:
if action == "change":
stops_that_dont_match.append(
(stops1[i - 1], stops2[j - 1])
)
i -= 1
j -= 1
return (
stops_missing_from_route1,
stops_missing_from_route2,
stops_that_dont_match,
)
def __len__(self) -> int:
return len(self.routes)
def __getitem__(self, i) -> Route:
return self.routes[i]
def __iter__(self) -> Iterator[Route]:
return iter(self.routes)
def __repr__(self) -> str:
return (
f"RouteMaster(id={self.id}, mode={self.mode}, ref={self.ref}, "
f"name={self.name}, network={self.network}, "
f"num_variants={len(self.routes)}"
)

View file

@ -0,0 +1,122 @@
from __future__ import annotations
import typing
from subways.osm_element import el_center, el_id
from subways.structure.station import Station
from subways.structure.stop_area import StopArea
from subways.types import LonLat, OsmElementT
if typing.TYPE_CHECKING:
from subways.structure.city import City
class RouteStop:
def __init__(self, stoparea: StopArea) -> None:
self.stoparea: StopArea = stoparea
self.stop: LonLat = None # Stop position, possibly projected
self.distance = 0 # In meters from the start of the route
self.platform_entry = None # Platform el_id
self.platform_exit = None # Platform el_id
self.can_enter = False
self.can_exit = False
self.seen_stop = False
self.seen_platform_entry = False
self.seen_platform_exit = False
self.seen_station = False
@property
def seen_platform(self) -> bool:
return self.seen_platform_entry or self.seen_platform_exit
@staticmethod
def get_actual_role(
el: OsmElementT, role: str, modes: set[str]
) -> str | None:
if StopArea.is_stop(el):
return "stop"
elif StopArea.is_platform(el):
return "platform"
elif Station.is_station(el, modes):
if "platform" in role:
return "platform"
else:
return "stop"
return None
def add(self, member: dict, relation: OsmElementT, city: City) -> None:
el = city.elements[el_id(member)]
role = member["role"]
if StopArea.is_stop(el):
if "platform" in role:
city.warn("Stop position in a platform role in a route", el)
if el["type"] != "node":
city.error("Stop position is not a node", el)
self.stop = el_center(el)
if "entry_only" not in role:
self.can_exit = True
if "exit_only" not in role:
self.can_enter = True
elif Station.is_station(el, city.modes):
if el["type"] != "node":
city.notice("Station in route is not a node", el)
if not self.seen_stop and not self.seen_platform:
self.stop = el_center(el)
self.can_enter = True
self.can_exit = True
elif StopArea.is_platform(el):
if "stop" in role:
city.warn("Platform in a stop role in a route", el)
if "exit_only" not in role:
self.platform_entry = el_id(el)
self.can_enter = True
if "entry_only" not in role:
self.platform_exit = el_id(el)
self.can_exit = True
if not self.seen_stop:
self.stop = el_center(el)
multiple_check = False
actual_role = RouteStop.get_actual_role(el, role, city.modes)
if actual_role == "platform":
if role == "platform_entry_only":
multiple_check = self.seen_platform_entry
self.seen_platform_entry = True
elif role == "platform_exit_only":
multiple_check = self.seen_platform_exit
self.seen_platform_exit = True
else:
if role != "platform" and "stop" not in role:
city.warn(
f'Platform "{el["tags"].get("name", "")}" '
f'({el_id(el)}) with invalid role "{role}" in route',
relation,
)
multiple_check = self.seen_platform
self.seen_platform_entry = True
self.seen_platform_exit = True
elif actual_role == "stop":
multiple_check = self.seen_stop
self.seen_stop = True
if multiple_check:
log_function = city.error if actual_role == "stop" else city.notice
log_function(
f'Multiple {actual_role}s for a station "'
f'{el["tags"].get("name", "")} '
f"({el_id(el)}) in a route relation",
relation,
)
def __repr__(self) -> str:
return (
"RouteStop(stop={}, pl_entry={}, pl_exit={}, stoparea={})".format(
self.stop,
self.platform_entry,
self.platform_exit,
self.stoparea,
)
)

View file

@ -0,0 +1,62 @@
from __future__ import annotations
import typing
from subways.consts import ALL_MODES, CONSTRUCTION_KEYS
from subways.css_colours import normalize_colour
from subways.osm_element import el_center, el_id
from subways.types import IdT, OsmElementT
if typing.TYPE_CHECKING:
from subways.structure.city import City
class Station:
def __init__(self, el: OsmElementT, city: City) -> None:
"""Call this with a railway=station OSM feature."""
self.id: IdT = el_id(el)
self.element: OsmElementT = el
self.modes = Station.get_modes(el)
self.name = el["tags"].get("name", "?")
self.int_name = el["tags"].get(
"int_name", el["tags"].get("name:en", None)
)
try:
self.colour = normalize_colour(el["tags"].get("colour", None))
except ValueError as e:
self.colour = None
city.warn(str(e), el)
self.center = el_center(el)
if self.center is None:
raise Exception("Could not find center of {}".format(el))
@staticmethod
def get_modes(el: OsmElementT) -> set[str]:
modes = {m for m in ALL_MODES if el["tags"].get(m) == "yes"}
if mode := el["tags"].get("station"):
modes.add(mode)
return modes
@staticmethod
def is_station(el: OsmElementT, modes: set[str]) -> bool:
# public_transport=station is too ambiguous and unspecific to use,
# so we expect for it to be backed by railway=station.
if (
"tram" in modes
and el.get("tags", {}).get("railway") == "tram_stop"
):
return True
if el.get("tags", {}).get("railway") not in ("station", "halt"):
return False
for k in CONSTRUCTION_KEYS:
if k in el["tags"]:
return False
# Not checking for station=train, obviously
if "train" not in modes and Station.get_modes(el).isdisjoint(modes):
return False
return True
def __repr__(self) -> str:
return "Station(id={}, modes={}, name={}, center={})".format(
self.id, ",".join(self.modes), self.name, self.center
)

View file

@ -0,0 +1,191 @@
from __future__ import annotations
import typing
from itertools import chain
from subways.consts import RAILWAY_TYPES
from subways.css_colours import normalize_colour
from subways.geom_utils import distance
from subways.osm_element import el_id, el_center
from subways.structure.station import Station
from subways.types import IdT, OsmElementT
if typing.TYPE_CHECKING:
from subways.structure.city import City
MAX_DISTANCE_TO_ENTRANCES = 300 # in meters
class StopArea:
@staticmethod
def is_stop(el: OsmElementT) -> bool:
if "tags" not in el:
return False
if el["tags"].get("railway") == "stop":
return True
if el["tags"].get("public_transport") == "stop_position":
return True
return False
@staticmethod
def is_platform(el: OsmElementT) -> bool:
if "tags" not in el:
return False
if el["tags"].get("railway") in ("platform", "platform_edge"):
return True
if el["tags"].get("public_transport") == "platform":
return True
return False
@staticmethod
def is_track(el: OsmElementT) -> bool:
if el["type"] != "way" or "tags" not in el:
return False
return el["tags"].get("railway") in RAILWAY_TYPES
def __init__(
self,
station: Station,
city: City,
stop_area: OsmElementT | None = None,
) -> None:
"""Call this with a Station object."""
self.element: OsmElementT = stop_area or station.element
self.id: IdT = el_id(self.element)
self.station: Station = station
self.stops = set() # set of el_ids of stop_positions
self.platforms = set() # set of el_ids of platforms
self.exits = set() # el_id of subway_entrance/train_station_entrance
# for leaving the platform
self.entrances = set() # el_id of subway/train_station entrance
# for entering the platform
self.center = None # lon, lat of the station centre point
self.centers = {} # el_id -> (lon, lat) for all elements
self.transfer = None # el_id of a transfer relation
self.modes = station.modes
self.name = station.name
self.int_name = station.int_name
self.colour = station.colour
if stop_area:
self.name = stop_area["tags"].get("name", self.name)
self.int_name = stop_area["tags"].get(
"int_name", stop_area["tags"].get("name:en", self.int_name)
)
try:
self.colour = (
normalize_colour(stop_area["tags"].get("colour"))
or self.colour
)
except ValueError as e:
city.warn(str(e), stop_area)
self._process_members(station, city, stop_area)
else:
self._add_nearby_entrances(station, city)
if self.exits and not self.entrances:
city.warn(
"Only exits for a station, no entrances",
stop_area or station.element,
)
if self.entrances and not self.exits:
city.warn("No exits for a station", stop_area or station.element)
for el in self.get_elements():
self.centers[el] = el_center(city.elements[el])
"""Calculate the center point of the station. This algorithm
cannot rely on a station node, since many stop_areas can share one.
Basically it averages center points of all platforms
and stop positions."""
if len(self.stops) + len(self.platforms) == 0:
self.center = station.center
else:
self.center = [0, 0]
for sp in chain(self.stops, self.platforms):
spc = self.centers[sp]
for i in range(2):
self.center[i] += spc[i]
for i in range(2):
self.center[i] /= len(self.stops) + len(self.platforms)
def _process_members(
self, station: Station, city: City, stop_area: OsmElementT
) -> None:
# If we have a stop area, add all elements from it
tracks_detected = False
for m in stop_area["members"]:
k = el_id(m)
m_el = city.elements.get(k)
if not m_el or "tags" not in m_el:
continue
if Station.is_station(m_el, city.modes):
if k != station.id:
city.error("Stop area has multiple stations", stop_area)
elif StopArea.is_stop(m_el):
self.stops.add(k)
elif StopArea.is_platform(m_el):
self.platforms.add(k)
elif (entrance_type := m_el["tags"].get("railway")) in (
"subway_entrance",
"train_station_entrance",
):
if m_el["type"] != "node":
city.warn(f"{entrance_type} is not a node", m_el)
if (
m_el["tags"].get("entrance") != "exit"
and m["role"] != "exit_only"
):
self.entrances.add(k)
if (
m_el["tags"].get("entrance") != "entrance"
and m["role"] != "entry_only"
):
self.exits.add(k)
elif StopArea.is_track(m_el):
tracks_detected = True
if tracks_detected:
city.warn("Tracks in a stop_area relation", stop_area)
def _add_nearby_entrances(self, station: Station, city: City) -> None:
center = station.center
for entrance_el in (
el
for el in city.elements.values()
if "tags" in el
and (entrance_type := el["tags"].get("railway"))
in ("subway_entrance", "train_station_entrance")
):
entrance_id = el_id(entrance_el)
if entrance_id in city.stop_areas:
continue # This entrance belongs to some stop_area
c_center = el_center(entrance_el)
if (
c_center
and distance(center, c_center) <= MAX_DISTANCE_TO_ENTRANCES
):
if entrance_el["type"] != "node":
city.warn(f"{entrance_type} is not a node", entrance_el)
etag = entrance_el["tags"].get("entrance")
if etag != "exit":
self.entrances.add(entrance_id)
if etag != "entrance":
self.exits.add(entrance_id)
def get_elements(self) -> set[IdT]:
result = {self.id, self.station.id}
result.update(self.entrances)
result.update(self.exits)
result.update(self.stops)
result.update(self.platforms)
return result
def __repr__(self) -> str:
return (
f"StopArea(id={self.id}, name={self.name}, station={self.station},"
f" transfer={self.transfer}, center={self.center})"
)

View file

@ -1,12 +1,20 @@
from __future__ import annotations
import json
import logging
import typing
from collections import OrderedDict
from io import BufferedIOBase
from typing import Any, TextIO
from subway_structure import City, OsmElementT, StopArea
from subways.types import OsmElementT
if typing.TYPE_CHECKING:
from subways.structure.city import City
from subways.structure.stop_area import StopArea
def load_xml(f: TextIO | str) -> list[OsmElementT]:
def load_xml(f: BufferedIOBase | str) -> list[OsmElementT]:
try:
from lxml import etree
except ImportError:
@ -257,7 +265,7 @@ def write_recovery_data(
def make_city_recovery_data(
city: City,
) -> dict[tuple[str | None, str | None], list[dict]]:
routes: dict[tuple(str | None, str | None), list[dict]] = {}
routes: dict[tuple[str | None, str | None], list[dict]] = {}
for route in city:
# Recovery is based primarily on route/station names/refs.
# If route's ref/colour changes, the route won't be used.

View file

Can't render this file because it has a wrong number of fields in line 2.

View file

@ -42,15 +42,108 @@ metro_samples = [
"cities_info": [
{
"num_stations": 2,
"num_lines": 1,
"num_light_lines": 0,
"num_interchanges": 0,
},
],
"errors": [],
"warnings": [],
"notices": [],
},
{
"name": "Station colour tag present/absent, correct/incorrect, on bear station / with stop_area", # noqa E501
"xml": """<?xml version='1.0' encoding='UTF-8'?>
<osm version='0.6' generator='JOSM'>
<node id='1' visible='true' version='1' lat='0.0' lon='0.0'>
<tag k='name' v='Station 1' />
<tag k='note' v='no &apos;colour&apos; tag' />
<tag k='railway' v='station' />
<tag k='station' v='subway' />
</node>
<node id='2' visible='true' version='1' lat='0.0' lon='0.01'>
<tag k='colour' v='red' />
<tag k='name' v='Station 2' />
<tag k='note' v='correct colour name' />
<tag k='railway' v='station' />
<tag k='station' v='subway' />
</node>
<node id='3' visible='true' version='1' lat='0.0' lon='0.02'>
<tag k='colour' v='#C1e' />
<tag k='name' v='Station 3' />
<tag k='note' v='correct colour 3-digit hex code' />
<tag k='railway' v='station' />
<tag k='station' v='subway' />
</node>
<node id='4' visible='true' version='1' lat='0.0' lon='0.03'>
<tag k='colour' v='incorrect' />
<tag k='name' v='Station 4' />
<tag k='note' v='incorrect &apos;colour&apos; tag' />
<tag k='railway' v='station' />
<tag k='station' v='subway' />
</node>
<node id='5' visible='true' version='1' lat='0.0' lon='0.04'>
<tag k='colour' v='#CD853F' />
<tag k='name' v='Station 5' />
<tag k='note' v='correct colour 6-digit hex code' />
<tag k='railway' v='station' />
<tag k='station' v='subway' />
</node>
<node id='6' visible='true' version='1' lat='0.0' lon='0.05'>
<tag k='colour' v='incorrect' />
<tag k='name' v='Station 6' />
<tag k='note' v='incorrect colour; station in a stop_area' />
<tag k='railway' v='station' />
<tag k='station' v='subway' />
</node>
<relation id='1' visible='true' version='1'>
<member type='node' ref='1' role='' />
<member type='node' ref='2' role='' />
<member type='node' ref='3' role='' />
<member type='node' ref='4' role='' />
<member type='node' ref='5' role='' />
<member type='node' ref='6' role='' />
<tag k='name' v='Forward' />
<tag k='ref' v='1' />
<tag k='route' v='subway' />
<tag k='type' v='route' />
</relation>
<relation id='2' visible='true' version='1'>
<member type='node' ref='6' role='' />
<member type='node' ref='5' role='' />
<member type='node' ref='4' role='' />
<member type='node' ref='3' role='' />
<member type='node' ref='2' role='' />
<member type='node' ref='1' role='' />
<tag k='name' v='Backward' />
<tag k='ref' v='1' />
<tag k='route' v='subway' />
<tag k='type' v='route' />
</relation>
<relation id='3' visible='true' version='1'>
<member type='relation' ref='1' role='' />
<member type='relation' ref='2' role='' />
<tag k='colour' v='red' />
<tag k='ref' v='1' />
<tag k='route_master' v='subway' />
<tag k='type' v='route_master' />
</relation>
<relation id='600' visible='true' version='1'>
<member type='node' ref='6' role='' />
<tag k='public_transport' v='stop_area' />
<tag k='type' v='public_transport' />
</relation>
</osm>
""",
"cities_info": [
{
"num_stations": 6,
},
],
"errors": [],
"warnings": [
'Unknown colour code: incorrect (node 4, "Station 4")',
'Unknown colour code: incorrect (node 6, "Station 6")',
],
"notices": [],
},
{
"name": "Bad station order",
"xml": """<?xml version='1.0' encoding='UTF-8'?>

View file

@ -1,5 +1,5 @@
from tests.sample_data_for_build_tracks import metro_samples
from tests.util import JsonLikeComparisonMixin, TestCase
from subways.tests.sample_data_for_build_tracks import metro_samples
from subways.tests.util import JsonLikeComparisonMixin, TestCase
class TestOneRouteTracks(JsonLikeComparisonMixin, TestCase):

View file

@ -1,9 +1,9 @@
import io
from unittest import TestCase
from process_subways import calculate_centers
from subway_io import load_xml
from tests.sample_data_for_center_calculation import metro_samples
from subways.validation import calculate_centers
from subways.subway_io import load_xml
from subways.tests.sample_data_for_center_calculation import metro_samples
class TestCenterCalculation(TestCase):

View file

@ -1,12 +1,12 @@
import itertools
from tests.sample_data_for_error_messages import (
from subways.tests.sample_data_for_error_messages import (
metro_samples as metro_samples_error,
)
from tests.sample_data_for_twin_routes import (
from subways.tests.sample_data_for_twin_routes import (
metro_samples as metro_samples_route_masters,
)
from tests.util import TestCase
from subways.tests.util import TestCase
class TestValidationMessages(TestCase):

View file

@ -1,7 +1,7 @@
from copy import deepcopy
from tests.sample_data_for_outputs import metro_samples
from tests.util import TestCase, JsonLikeComparisonMixin
from subways.tests.sample_data_for_outputs import metro_samples
from subways.tests.util import TestCase, JsonLikeComparisonMixin
class TestTransfers(JsonLikeComparisonMixin, TestCase):

View file

@ -2,10 +2,14 @@ import csv
from functools import partial
from pathlib import Path
from processors._common import transit_to_dict
from processors.gtfs import dict_to_row, GTFS_COLUMNS, transit_data_to_gtfs
from tests.sample_data_for_outputs import metro_samples
from tests.util import TestCase
from subways.processors._common import transit_to_dict
from subways.processors.gtfs import (
dict_to_row,
GTFS_COLUMNS,
transit_data_to_gtfs,
)
from subways.tests.sample_data_for_outputs import metro_samples
from subways.tests.util import TestCase
class TestGTFS(TestCase):

View file

@ -1,8 +1,8 @@
from operator import itemgetter
from processors.mapsme import transit_data_to_mapsme
from tests.sample_data_for_outputs import metro_samples
from tests.util import JsonLikeComparisonMixin, TestCase
from subways.processors.mapsme import transit_data_to_mapsme
from subways.tests.sample_data_for_outputs import metro_samples
from subways.tests.util import JsonLikeComparisonMixin, TestCase
class TestMapsme(JsonLikeComparisonMixin, TestCase):

View file

@ -1,6 +1,6 @@
from unittest import TestCase, mock
from process_subways import compose_overpass_request, overpass_request
from subways.overpass import compose_overpass_request, overpass_request
class TestOverpassQuery(TestCase):
@ -150,11 +150,11 @@ class TestOverpassQuery(TestCase):
"%28._%3B%3E%3E%3B%29%3Bout%20body%20center%20qt%3B"
)
with mock.patch("process_subways.json.load") as load_mock:
with mock.patch("subways.overpass.json.load") as load_mock:
load_mock.return_value = {"elements": []}
with mock.patch(
"process_subways.urllib.request.urlopen"
"subways.overpass.urllib.request.urlopen"
) as urlopen_mock:
urlopen_mock.return_value.getcode.return_value = 200

View file

@ -2,7 +2,7 @@ import inspect
from pathlib import Path
from unittest import TestCase
from process_subways import prepare_cities
from subways.validation import prepare_cities
class TestPrepareCities(TestCase):

View file

@ -2,17 +2,23 @@ import collections
import itertools
import unittest
from subway_structure import project_on_segment
from subways.geom_utils import project_on_segment
from subways.types import LonLat
class TestProjection(unittest.TestCase):
"""Test subway_structure.project_on_segment function"""
"""Test subways.geom_utils.project_on_segment function"""
PRECISION = 10 # decimal places in assertAlmostEqual
SHIFT = 1e-6 # Small distance between projected point and segment endpoint
def _test_projection_in_bulk(self, points, segments, answers):
def _test_projection_in_bulk(
self,
points: list[LonLat],
segments: list[tuple[LonLat, LonLat]],
answers: list[float | None],
) -> None:
"""Test 'project_on_segment' function for array of points and array
of parallel segments projections on which are equal.
"""
@ -39,7 +45,7 @@ class TestProjection(unittest.TestCase):
f"{segment}: {u} returned, {answer} expected",
)
def test_projection_on_horizontal_segments(self):
def test_projection_on_horizontal_segments(self) -> None:
points = [
(-2, 0),
(-1 - self.SHIFT, 0),
@ -74,7 +80,7 @@ class TestProjection(unittest.TestCase):
self._test_projection_in_bulk(points, horizontal_segments, answers)
def test_projection_on_vertical_segments(self):
def test_projection_on_vertical_segments(self) -> None:
points = [
(0, -2),
(0, -1 - self.SHIFT),
@ -109,7 +115,7 @@ class TestProjection(unittest.TestCase):
self._test_projection_in_bulk(points, vertical_segments, answers)
def test_projection_on_inclined_segment(self):
def test_projection_on_inclined_segment(self) -> None:
points = [
(-2, -2),
(-1, -1),
@ -128,7 +134,7 @@ class TestProjection(unittest.TestCase):
self._test_projection_in_bulk(points, segments, answers)
def test_projection_with_different_collections(self):
def test_projection_with_different_collections(self) -> None:
"""The tested function should accept points as any consecutive
container with index operator.
"""
@ -148,7 +154,7 @@ class TestProjection(unittest.TestCase):
s2 = s2_type(segment_end2)
project_on_segment(p, s1, s2)
def test_projection_on_degenerate_segment(self):
def test_projection_on_degenerate_segment(self) -> None:
coords = [-1, 0, 1]
points = [(x, y) for x, y in itertools.product(coords, coords)]
segments = [

View file

@ -1,6 +1,6 @@
from subway_structure import RouteMaster
from tests.sample_data_for_twin_routes import metro_samples
from tests.util import TestCase
from subways.structure.route_master import RouteMaster
from subways.tests.sample_data_for_twin_routes import metro_samples
from subways.tests.util import TestCase
class TestRouteMaster(TestCase):

View file

@ -1,6 +1,6 @@
from unittest import TestCase
from subway_structure import Station
from subways.structure.station import Station
class TestStation(TestCase):

View file

@ -1,9 +1,9 @@
import json
from operator import itemgetter
from processors._common import transit_to_dict
from tests.sample_data_for_outputs import metro_samples
from tests.util import JsonLikeComparisonMixin, TestCase
from subways.processors._common import transit_to_dict
from subways.tests.sample_data_for_outputs import metro_samples
from subways.tests.util import JsonLikeComparisonMixin, TestCase
class TestStorage(JsonLikeComparisonMixin, TestCase):

View file

@ -4,13 +4,13 @@ from pathlib import Path
from typing import Any, TypeAlias, Self
from unittest import TestCase as unittestTestCase
from process_subways import (
from subways.structure.city import City, find_transfers
from subways.subway_io import load_xml
from subways.validation import (
add_osm_elements_to_cities,
validate_cities,
calculate_centers,
)
from subway_io import load_xml
from subway_structure import City, find_transfers
TestCaseMixin: TypeAlias = Self | unittestTestCase

14
subways/types.py Normal file
View file

@ -0,0 +1,14 @@
from typing import TypeAlias
OsmElementT: TypeAlias = dict
IdT: TypeAlias = str # Type of feature ids
TransferT: TypeAlias = set[IdT] # A transfer is a set of StopArea IDs
TransfersT: TypeAlias = list[TransferT]
LonLat: TypeAlias = tuple[float, float]
RailT: TypeAlias = list[LonLat]
class CriticalValidationError(Exception):
"""Is thrown if an error occurs
that prevents further validation of a city."""

253
subways/validation.py Normal file
View file

@ -0,0 +1,253 @@
import csv
import logging
import urllib.request
from functools import partial
from subways.structure.city import City
from subways.types import CriticalValidationError, LonLat, OsmElementT
DEFAULT_SPREADSHEET_ID = "1SEW1-NiNOnA2qDwievcxYV1FOaQl1mb1fdeyqAxHu3k"
DEFAULT_CITIES_INFO_URL = (
"https://docs.google.com/spreadsheets/d/"
f"{DEFAULT_SPREADSHEET_ID}/export?format=csv"
)
BAD_MARK = "[bad]"
def get_way_center(
element: OsmElementT, node_centers: dict[int, LonLat]
) -> LonLat | None:
"""
:param element: dict describing OSM element
:param node_centers: osm_id => LonLat
:return: tuple with center coordinates, or None
"""
# If elements have been queried via overpass-api with
# 'out center;' clause then ways already have 'center' attribute
if "center" in element:
return element["center"]["lon"], element["center"]["lat"]
if "nodes" not in element:
return None
center = [0, 0]
count = 0
way_nodes = element["nodes"]
way_nodes_len = len(element["nodes"])
for i, nd in enumerate(way_nodes):
if nd not in node_centers:
continue
# Don't count the first node of a closed way twice
if (
i == way_nodes_len - 1
and way_nodes_len > 1
and way_nodes[0] == way_nodes[-1]
):
break
center[0] += node_centers[nd][0]
center[1] += node_centers[nd][1]
count += 1
if count == 0:
return None
element["center"] = {"lat": center[1] / count, "lon": center[0] / count}
return element["center"]["lon"], element["center"]["lat"]
def get_relation_center(
element: OsmElementT,
node_centers: dict[int, LonLat],
way_centers: dict[int, LonLat],
relation_centers: dict[int, LonLat],
ignore_unlocalized_child_relations: bool = False,
) -> LonLat | None:
"""
:param element: dict describing OSM element
:param node_centers: osm_id => LonLat
:param way_centers: osm_id => LonLat
:param relation_centers: osm_id => LonLat
:param ignore_unlocalized_child_relations: if a member that is a relation
has no center, skip it and calculate center based on member nodes,
ways and other, "localized" (with known centers), relations
:return: tuple with center coordinates, or None
"""
# If elements have been queried via overpass-api with
# 'out center;' clause then some relations already have 'center'
# attribute. But this is not the case for relations composed only
# of other relations (e.g., route_master, stop_area_group or
# stop_area with only members that are multipolygons)
if "center" in element:
return element["center"]["lon"], element["center"]["lat"]
center = [0, 0]
count = 0
for m in element.get("members", list()):
m_id = m["ref"]
m_type = m["type"]
if m_type == "relation" and m_id not in relation_centers:
if ignore_unlocalized_child_relations:
continue
else:
# Cannot calculate fair center because the center
# of a child relation is not known yet
return None
member_container = (
node_centers
if m_type == "node"
else way_centers
if m_type == "way"
else relation_centers
)
if m_id in member_container:
center[0] += member_container[m_id][0]
center[1] += member_container[m_id][1]
count += 1
if count == 0:
return None
element["center"] = {"lat": center[1] / count, "lon": center[0] / count}
return element["center"]["lon"], element["center"]["lat"]
def calculate_centers(elements: list[OsmElementT]) -> None:
"""Adds 'center' key to each way/relation in elements,
except for empty ways or relations.
Relies on nodes-ways-relations order in the elements list.
"""
nodes: dict[int, LonLat] = {} # id => LonLat
ways: dict[int, LonLat] = {} # id => approx center LonLat
relations: dict[int, LonLat] = {} # id => approx center LonLat
unlocalized_relations: list[OsmElementT] = [] # 'unlocalized' means
# the center of the relation has not been calculated yet
for el in elements:
if el["type"] == "node":
nodes[el["id"]] = (el["lon"], el["lat"])
elif el["type"] == "way":
if center := get_way_center(el, nodes):
ways[el["id"]] = center
elif el["type"] == "relation":
if center := get_relation_center(el, nodes, ways, relations):
relations[el["id"]] = center
else:
unlocalized_relations.append(el)
def iterate_relation_centers_calculation(
ignore_unlocalized_child_relations: bool,
) -> list[OsmElementT]:
unlocalized_relations_upd = []
for rel in unlocalized_relations:
if center := get_relation_center(
rel, nodes, ways, relations, ignore_unlocalized_child_relations
):
relations[rel["id"]] = center
else:
unlocalized_relations_upd.append(rel)
return unlocalized_relations_upd
# Calculate centers for relations that have no one yet
while unlocalized_relations:
unlocalized_relations_upd = iterate_relation_centers_calculation(False)
progress = len(unlocalized_relations_upd) < len(unlocalized_relations)
if not progress:
unlocalized_relations_upd = iterate_relation_centers_calculation(
True
)
progress = len(unlocalized_relations_upd) < len(
unlocalized_relations
)
if not progress:
break
unlocalized_relations = unlocalized_relations_upd
def add_osm_elements_to_cities(
osm_elements: list[OsmElementT], cities: list[City]
) -> None:
for el in osm_elements:
for c in cities:
if c.contains(el):
c.add(el)
def validate_cities(cities: list[City]) -> list[City]:
"""Validate cities. Return list of good cities."""
good_cities = []
for c in cities:
try:
c.extract_routes()
except CriticalValidationError as e:
logging.error(
"Critical validation error while processing %s: %s",
c.name,
e,
)
c.error(str(e))
except AssertionError as e:
logging.error(
"Validation logic error while processing %s: %s",
c.name,
e,
)
c.error(f"Validation logic error: {e}")
else:
c.validate()
if c.is_good:
c.calculate_distances()
good_cities.append(c)
return good_cities
def get_cities_info(
cities_info_url: str = DEFAULT_CITIES_INFO_URL,
) -> list[dict]:
response = urllib.request.urlopen(cities_info_url)
if (
not cities_info_url.startswith("file://")
and (r_code := response.getcode()) != 200
):
raise Exception(
f"Failed to download cities spreadsheet: HTTP {r_code}"
)
data = response.read().decode("utf-8")
reader = csv.DictReader(
data.splitlines(),
fieldnames=(
"id",
"name",
"country",
"continent",
"num_stations",
"num_lines",
"num_light_lines",
"num_interchanges",
"bbox",
"networks",
),
)
cities_info = list()
names = set()
next(reader) # skipping the header
for city_info in reader:
if city_info["id"] and city_info["bbox"]:
cities_info.append(city_info)
name = city_info["name"].strip()
if name in names:
logging.warning(
"Duplicate city name in city list: %s",
city_info,
)
names.add(name)
return cities_info
def prepare_cities(
cities_info_url: str = DEFAULT_CITIES_INFO_URL, overground: bool = False
) -> list[City]:
if overground:
raise NotImplementedError("Overground transit not implemented yet")
cities_info = get_cities_info(cities_info_url)
return list(map(partial(City, overground=overground), cities_info))

View file

@ -2,14 +2,18 @@
Generate sorted list of all cities, with [bad] mark for bad cities.
!!! Deprecated for use in validation cycle.
Use "process_subways.py --dump-city-list <filename>" instead.
Use "scripts/process_subways.py --dump-city-list <filename>" instead.
"""
import argparse
import json
from process_subways import BAD_MARK, DEFAULT_CITIES_INFO_URL, get_cities_info
from subways.validation import (
BAD_MARK,
DEFAULT_CITIES_INFO_URL,
get_cities_info,
)
if __name__ == "__main__":
@ -19,7 +23,7 @@ if __name__ == "__main__":
used by subway render to generate the list of network at frontend.
It uses two sources: a mapsme.json validator output with good
networks, and a google spreadsheet with networks for the
process_subways.download_cities() function."""
subways.validation.get_cities_info() function."""
),
formatter_class=argparse.RawTextHelpFormatter,
)

View file

@ -3,7 +3,7 @@ import argparse
from shapely import unary_union
from shapely.geometry import MultiPolygon, Polygon
from process_subways import DEFAULT_CITIES_INFO_URL, get_cities_info
from subways.validation import DEFAULT_CITIES_INFO_URL, get_cities_info
def make_disjoint_metro_polygons(cities_info_url: str) -> None:

View file

View file

Can't render this file because it has a wrong number of fields in line 2.

View file

Can't render this file because it has a wrong number of fields in line 3.

View file

@ -1,6 +1,6 @@
import contextlib
import io
import os
from pathlib import Path
from unittest import TestCase
from make_all_metro_poly import make_disjoint_metro_polygons
@ -63,9 +63,8 @@ class TestMakeAllMetroPoly(TestCase):
def test_make_disjoint_metro_polygons(self) -> None:
for case in cases:
with self.subTest(msg=case["csv_file"]):
file_url = (
f"file://{os.getcwd()}/tests/assets/{case['csv_file']}"
)
assets_dir = Path(__file__).resolve().parent / "assets"
file_url = f"file://{assets_dir}/{case['csv_file']}"
stream = io.StringIO()
with contextlib.redirect_stdout(stream):
make_disjoint_metro_polygons(file_url)

View file

@ -9,7 +9,7 @@ import re
from collections import defaultdict
from typing import Any
from process_subways import DEFAULT_SPREADSHEET_ID
from subways.validation import DEFAULT_SPREADSHEET_ID
from v2h_templates import (
COUNTRY_CITY,
COUNTRY_FOOTER,