Compare commits

...
Sign in to create a new pull request.

30 commits

Author SHA1 Message Date
Alexey Zakharenkov
fa5e036053 Clean requirements 2023-03-14 19:15:44 +01:00
Alexey Zakharenkov
ae0c568afe Different improvements to validation_to_html.py 2023-03-14 19:15:44 +01:00
Alexey Zakharenkov
fc60bf56fa Add --cities-info-url CLI parameter to the main script and utilities 2023-03-14 19:15:44 +01:00
Alexey Zakharenkov
8f9265a18d Remove unreachable code 2023-03-14 19:15:44 +01:00
Alexey Zakharenkov
25cf8d9ec9 Create directories for validator output if not exist 2023-03-14 19:15:44 +01:00
Alexey Zakharenkov
ec2f57b266 Improve relation center calculation 2023-03-14 19:15:44 +01:00
Alexey Zakharenkov
d5f2026301 Run the code through flake8 and black 2023-03-14 19:15:44 +01:00
Alexey Zakharenkov
037d299943 Create universal serializable transit data format, use it in GTFS processor 2023-03-14 19:15:44 +01:00
Alexey Zakharenkov
a7f135f1b0 Factory pattern for route instantiation in City class 2023-03-14 19:15:44 +01:00
Alexey Zakharenkov
2c71990f0e Fix calculating stop positions for route with rails reversed relative to stops order 2023-03-14 19:15:44 +01:00
Alexey Zakharenkov
36d947047e Defer CSV-complying data transformation to write-to-CSV phase 2023-03-14 19:15:44 +01:00
Alexey Zakharenkov
e27fdd8c2d Add to GTFS only transfers in which both stops are part of routes 2023-03-14 19:15:44 +01:00
Alexey Zakharenkov
909c7b3c70 Refactoring: make main() function and two more functions 2023-03-14 19:15:44 +01:00
Alexey Zakharenkov
a6adb9f0b5 More accurate OSM element centers calculation 2023-03-14 19:15:44 +01:00
Alexey Zakharenkov
9f2b141277 Remove duplicate variable assignment 2023-03-14 19:15:44 +01:00
Alexey Zakharenkov
4bb99a37ea Refactoring: move a nested function to the module level; add tests for it 2023-03-14 19:15:44 +01:00
Alexey Zakharenkov
cf0ce1c55e Add two new checks 2023-03-14 19:15:44 +01:00
Alexey Zakharenkov
45f247d793 Create method to allow overriding 2023-03-14 19:15:44 +01:00
Alexey Zakharenkov
89e4d5c261 Take into account that City.validate() method may never be called for a city - in case a CriticalValidationError occured 2023-03-14 19:15:43 +01:00
Alexey Zakharenkov
aab668f550 Validation error 'Stop ... is nowhere near the tracks' was mistakenly supressed and now returned 2023-03-14 19:15:43 +01:00
Alexey Zakharenkov
0478791cf2 Fix attribute assignment 2023-03-14 19:15:43 +01:00
Alexey Zakharenkov
ee913b6d2d Fixes to GTFS generation 2023-03-14 19:15:43 +01:00
Alexey Zakharenkov
82707f8528 Make City.is_good a property 2023-03-14 19:15:43 +01:00
Alexey Zakharenkov
d88755de99 Use csv.DictReader instead of csv.reader to load city data 2023-03-14 19:15:43 +01:00
Alexey Zakharenkov
123d0f96d3 Add tests on adjusting rails geometry; configure GitHub Actions for the tests 2023-03-14 19:15:43 +01:00
Alexey Zakharenkov
28c455d368 Methods to adjust rails at route start/end 2023-03-14 19:15:43 +01:00
Alexey Zakharenkov
78304da88f Refactor Route.__init_() to separate stops processing from rails processing 2023-03-14 19:15:43 +01:00
Alexey Zakharenkov
2224b9f318 GTFS output 2023-03-14 19:15:43 +01:00
Alexey Zakharenkov
fd27851c0b Revert PR #18 as incorrect: there may be lines with the same ref in a network 2023-03-14 19:15:43 +01:00
Jiaxun Yang
7072494bbd Suggest the parent route of platform with invalid role
The wraning message now looks like: 
Platform "" (w1055889735) with invalid role '' in route (relation 2336456, "")
It will be easier for us to sort those issues.
2023-03-14 19:15:43 +01:00
32 changed files with 4693 additions and 1624 deletions

39
.github/workflows/python-app.yml vendored Normal file
View file

@ -0,0 +1,39 @@
# This workflow will install Python dependencies, run tests and lint with a single version of Python
# For more information see: https://help.github.com/actions/language-and-framework-guides/using-python-with-github-actions
name: Python application
on:
push:
branches: [ "master" ]
pull_request:
branches: [ "master" ]
permissions:
contents: read
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python 3.8
uses: actions/setup-python@v3
with:
python-version: "3.8"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install flake8==6.0.0 black==23.1.0
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
- name: Lint with flake8
run: |
flake8
- name: Check with black
run: |
black --check --line-length 79 .
- name: Test with unittest
run: |
python -m unittest discover tests

1
.gitignore vendored
View file

@ -11,3 +11,4 @@ html/
*.yaml
*.pyc
*.txt
*.zip

View file

@ -5,6 +5,8 @@ systems in the world from OpenStreetMap. `subway_structure.py` produces
a list of disjunct systems that can be used for routing and for displaying
of metro maps.
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
## How To Validate
@ -51,7 +53,7 @@ a city's bbox has been extended.
A single city or a country with few metro networks can be validated much faster
if you allow the `process_subway.py` to fetch data from Overpass API. Here are the steps:
1. Python3 interpreter required (3.5+)
1. Python3 interpreter required (3.8+)
2. Clone the repo
```
git clone https://github.com/alexey-zakharenkov/subways.git subways_validator

View file

@ -1,6 +1,6 @@
import functools
import logging
import math
import functools
"""A coordinate of a station precision of which we must take into account
@ -16,42 +16,48 @@ def coords_eq(lon1, lat1, lon2, lat2):
def osm_id_comparator(el):
"""This function is used as key for sorting lists of
OSM-originated objects
OSM-originated objects
"""
return (el['osm_type'], el['osm_id'])
return (el["osm_type"], el["osm_id"])
def itinerary_comparator(itinerary):
"This function is used as key for sorting itineraries in a route"""
return (itinerary['stops'], itinerary['interval'])
"""This function is used as key for sorting itineraries in a route"""
return (itinerary["stops"], itinerary["interval"])
def compare_stops(stop0, stop1):
"""Compares json of two stops in route"""
stop_keys = ('name', 'int_name', 'id', 'osm_id', 'osm_type')
stop_keys = ("name", "int_name", "id", "osm_id", "osm_type")
stop0_props = tuple(stop0[k] for k in stop_keys)
stop1_props = tuple(stop1[k] for k in stop_keys)
if stop0_props != stop1_props:
logging.debug("Different stops properties: %s, %s",
stop0_props, stop1_props)
logging.debug(
"Different stops properties: %s, %s", stop0_props, stop1_props
)
return False
if not coords_eq(stop0['lon'], stop0['lat'],
stop1['lon'], stop1['lat']):
logging.debug("Different stops coordinates: %s (%f, %f), %s (%f, %f)",
stop0_props, stop0['lon'], stop0['lat'],
stop1_props, stop1['lon'], stop1['lat'])
if not coords_eq(stop0["lon"], stop0["lat"], stop1["lon"], stop1["lat"]):
logging.debug(
"Different stops coordinates: %s (%f, %f), %s (%f, %f)",
stop0_props,
stop0["lon"],
stop0["lat"],
stop1_props,
stop1["lon"],
stop1["lat"],
)
return False
entrances0 = sorted(stop0['entrances'], key=osm_id_comparator)
entrances1 = sorted(stop1['entrances'], key=osm_id_comparator)
entrances0 = sorted(stop0["entrances"], key=osm_id_comparator)
entrances1 = sorted(stop1["entrances"], key=osm_id_comparator)
if entrances0 != entrances1:
logging.debug("Different stop entrances")
return False
exits0 = sorted(stop0['exits'], key=osm_id_comparator)
exits1 = sorted(stop1['exits'], key=osm_id_comparator)
exits0 = sorted(stop0["exits"], key=osm_id_comparator)
exits1 = sorted(stop1["exits"], key=osm_id_comparator)
if exits0 != exits1:
logging.debug("Different stop exits")
return False
@ -61,21 +67,24 @@ def compare_stops(stop0, stop1):
def compare_transfers(transfers0, transfers1):
"""Compares two arrays of transfers of the form
[(stop1_uid, stop2_uid, time), ...]
[(stop1_uid, stop2_uid, time), ...]
"""
if len(transfers0) != len(transfers1):
logging.debug("Different len(transfers): %d != %d",
len(transfers0), len(transfers1))
logging.debug(
"Different len(transfers): %d != %d",
len(transfers0),
len(transfers1),
)
return False
transfers0 = [tuple([t[0], t[1], t[2]])
if t[0] < t[1] else
tuple([t[1], t[0], t[2]])
for t in transfers0]
transfers1 = [tuple([t[0], t[1], t[2]])
if t[0] < t[1] else
tuple([t[1], t[0], t[2]])
for t in transfers1]
transfers0 = [
tuple([t[0], t[1], t[2]]) if t[0] < t[1] else tuple([t[1], t[0], t[2]])
for t in transfers0
]
transfers1 = [
tuple([t[0], t[1], t[2]]) if t[0] < t[1] else tuple([t[1], t[0], t[2]])
for t in transfers1
]
transfers0.sort()
transfers1.sort()
@ -84,8 +93,9 @@ def compare_transfers(transfers0, transfers1):
for tr0, tr1 in zip(transfers0, transfers1):
if tr0 != tr1:
if diff_cnt == 0:
logging.debug("First pair of different transfers: %s, %s",
tr0, tr1)
logging.debug(
"First pair of different transfers: %s, %s", tr0, tr1
)
diff_cnt += 1
if diff_cnt:
logging.debug("Different transfers number = %d", diff_cnt)
@ -95,46 +105,55 @@ def compare_transfers(transfers0, transfers1):
def compare_networks(network0, network1):
if network0['agency_id'] != network1['agency_id']:
logging.debug("Different agency_id at route '%s'",
network0['network'])
if network0["agency_id"] != network1["agency_id"]:
logging.debug("Different agency_id at route '%s'", network0["network"])
return False
route_ids0 = sorted(x['route_id'] for x in network0['routes'])
route_ids1 = sorted(x['route_id'] for x in network1['routes'])
route_ids0 = sorted(x["route_id"] for x in network0["routes"])
route_ids1 = sorted(x["route_id"] for x in network1["routes"])
if route_ids0 != route_ids1:
logging.debug("Different route_ids: %s != %s",
route_ids0, route_ids1)
logging.debug("Different route_ids: %s != %s", route_ids0, route_ids1)
return False
routes0 = sorted(network0['routes'], key=lambda x: x['route_id'])
routes1 = sorted(network1['routes'], key=lambda x: x['route_id'])
routes0 = sorted(network0["routes"], key=lambda x: x["route_id"])
routes1 = sorted(network1["routes"], key=lambda x: x["route_id"])
# Keys to compare routes. 'name' key is omitted since RouteMaster
# can get its name from one of its Routes unpredictably.
route_keys = ('type', 'ref', 'colour', 'route_id')
route_keys = ("type", "ref", "colour", "route_id")
for route0, route1 in zip(routes0, routes1):
route0_props = tuple(route0[k] for k in route_keys)
route1_props = tuple(route1[k] for k in route_keys)
if route0_props != route1_props:
logging.debug("Route props of '%s' are different: %s, %s",
route0['route_id'], route0_props, route1_props)
logging.debug(
"Route props of '%s' are different: %s, %s",
route0["route_id"],
route0_props,
route1_props,
)
return False
itineraries0 = sorted(route0['itineraries'], key=itinerary_comparator)
itineraries1 = sorted(route1['itineraries'], key=itinerary_comparator)
itineraries0 = sorted(route0["itineraries"], key=itinerary_comparator)
itineraries1 = sorted(route1["itineraries"], key=itinerary_comparator)
for itin0, itin1 in zip(itineraries0, itineraries1):
if itin0['interval'] != itin1['interval']:
logging.debug("Different interval: %d != %d at route %s '%s'",
itin0['interval'], itin1['interval'],
route0['route_id'], route0['name'])
if itin0["interval"] != itin1["interval"]:
logging.debug(
"Different interval: %d != %d at route %s '%s'",
itin0["interval"],
itin1["interval"],
route0["route_id"],
route0["name"],
)
return False
if itin0['stops'] != itin1['stops']:
logging.debug("Different stops at route %s '%s'",
route0['route_id'], route0['name'])
if itin0["stops"] != itin1["stops"]:
logging.debug(
"Different stops at route %s '%s'",
route0["route_id"],
route0["name"],
)
return False
return True

View file

@ -10,10 +10,11 @@
affect the process_subways.py output really doesn't change it.
"""
import sys
import json
import logging
from common import compare_stops, compare_transfers, compare_networks
import sys
from common import compare_networks, compare_stops, compare_transfers
def compare_jsons(cache0, cache1):
@ -28,21 +29,21 @@ def compare_jsons(cache0, cache1):
for name in city_names0:
city0 = cache0[name]
city1 = cache1[name]
if not compare_networks(city0['network'], city1['network']):
if not compare_networks(city0["network"], city1["network"]):
return False
stop_ids0 = sorted(city0['stops'].keys())
stop_ids1 = sorted(city1['stops'].keys())
stop_ids0 = sorted(city0["stops"].keys())
stop_ids1 = sorted(city1["stops"].keys())
if stop_ids0 != stop_ids1:
logging.debug("Different stop_ids")
return False
stops0 = [v for k, v in sorted(city0['stops'].items())]
stops1 = [v for k, v in sorted(city1['stops'].items())]
stops0 = [v for k, v in sorted(city0["stops"].items())]
stops1 = [v for k, v in sorted(city1["stops"].items())]
for stop0, stop1 in zip(stops0, stops1):
if not compare_stops(stop0, stop1):
return False
if not compare_transfers(city0['transfers'], city1['transfers']):
if not compare_transfers(city0["transfers"], city1["transfers"]):
return False
return True
@ -57,8 +58,8 @@ if __name__ == "__main__":
path0, path1 = sys.argv[1:3]
j0 = json.load(open(path0, encoding='utf-8'))
j1 = json.load(open(path1, encoding='utf-8'))
j0 = json.load(open(path0, encoding="utf-8"))
j1 = json.load(open(path1, encoding="utf-8"))
equal = compare_jsons(j0, j1)

View file

@ -10,38 +10,39 @@
affect the process_subways.py output really doesn't change it.
"""
import sys
import json
import logging
from common import compare_stops, compare_transfers, compare_networks
import sys
from common import compare_networks, compare_stops, compare_transfers
def compare_jsons(result0, result1):
"""Compares two objects which are results of subway generation"""
network_names0 = sorted([x['network'] for x in result0['networks']])
network_names1 = sorted([x['network'] for x in result1['networks']])
network_names0 = sorted([x["network"] for x in result0["networks"]])
network_names1 = sorted([x["network"] for x in result1["networks"]])
if network_names0 != network_names1:
logging.debug("Different list of network names!")
return False
networks0 = sorted(result0['networks'], key=lambda x: x['network'])
networks1 = sorted(result1['networks'], key=lambda x: x['network'])
networks0 = sorted(result0["networks"], key=lambda x: x["network"])
networks1 = sorted(result1["networks"], key=lambda x: x["network"])
for network0, network1 in zip(networks0, networks1):
if not compare_networks(network0, network1):
return False
stop_ids0 = sorted(x['id'] for x in result0['stops'])
stop_ids1 = sorted(x['id'] for x in result1['stops'])
stop_ids0 = sorted(x["id"] for x in result0["stops"])
stop_ids1 = sorted(x["id"] for x in result1["stops"])
if stop_ids0 != stop_ids1:
logging.debug("Different stop_ids")
return False
stops0 = sorted(result0['stops'], key=lambda x: x['id'])
stops1 = sorted(result1['stops'], key=lambda x: x['id'])
stops0 = sorted(result0["stops"], key=lambda x: x["id"])
stops1 = sorted(result1["stops"], key=lambda x: x["id"])
for stop0, stop1 in zip(stops0, stops1):
if not compare_stops(stop0, stop1):
return False
if not compare_transfers(result0['transfers'], result1['transfers']):
if not compare_transfers(result0["transfers"], result1["transfers"]):
return False
return True
@ -56,8 +57,8 @@ if __name__ == "__main__":
path0, path1 = sys.argv[1:3]
j0 = json.load(open(path0, encoding='utf-8'))
j1 = json.load(open(path1, encoding='utf-8'))
j0 = json.load(open(path0, encoding="utf-8"))
j1 = json.load(open(path1, encoding="utf-8"))
equal = compare_jsons(j0, j1)

View file

@ -2,153 +2,153 @@ import re
# Source: https://www.w3.org/TR/css3-color/#svg-color
CSS_COLOURS = {
'aliceblue': '#f0f8ff',
'antiquewhite': '#faebd7',
'aqua': '#00ffff',
'aquamarine': '#7fffd4',
'azure': '#f0ffff',
'beige': '#f5f5dc',
'bisque': '#ffe4c4',
'black': '#000000',
'blanchedalmond': '#ffebcd',
'blue': '#0000ff',
'blueviolet': '#8a2be2',
'brown': '#a52a2a',
'burlywood': '#deb887',
'cadetblue': '#5f9ea0',
'chartreuse': '#7fff00',
'chocolate': '#d2691e',
'coral': '#ff7f50',
'cornflowerblue': '#6495ed',
'cornsilk': '#fff8dc',
'crimson': '#dc143c',
'cyan': '#00ffff',
'darkblue': '#00008b',
'darkcyan': '#008b8b',
'darkgoldenrod': '#b8860b',
'darkgray': '#a9a9a9',
'darkgreen': '#006400',
'darkgrey': '#a9a9a9',
'darkkhaki': '#bdb76b',
'darkmagenta': '#8b008b',
'darkolivegreen': '#556b2f',
'darkorange': '#ff8c00',
'darkorchid': '#9932cc',
'darkred': '#8b0000',
'darksalmon': '#e9967a',
'darkseagreen': '#8fbc8f',
'darkslateblue': '#483d8b',
'darkslategray': '#2f4f4f',
'darkslategrey': '#2f4f4f',
'darkturquoise': '#00ced1',
'darkviolet': '#9400d3',
'deeppink': '#ff1493',
'deepskyblue': '#00bfff',
'dimgray': '#696969',
'dimgrey': '#696969',
'dodgerblue': '#1e90ff',
'firebrick': '#b22222',
'floralwhite': '#fffaf0',
'forestgreen': '#228b22',
'fuchsia': '#ff00ff',
'gainsboro': '#dcdcdc',
'ghostwhite': '#f8f8ff',
'gold': '#ffd700',
'goldenrod': '#daa520',
'gray': '#808080',
'green': '#008000',
'greenyellow': '#adff2f',
'grey': '#808080',
'honeydew': '#f0fff0',
'hotpink': '#ff69b4',
'indianred': '#cd5c5c',
'indigo': '#4b0082',
'ivory': '#fffff0',
'khaki': '#f0e68c',
'lavender': '#e6e6fa',
'lavenderblush': '#fff0f5',
'lawngreen': '#7cfc00',
'lemonchiffon': '#fffacd',
'lightblue': '#add8e6',
'lightcoral': '#f08080',
'lightcyan': '#e0ffff',
'lightgoldenrodyellow': '#fafad2',
'lightgray': '#d3d3d3',
'lightgreen': '#90ee90',
'lightgrey': '#d3d3d3',
'lightpink': '#ffb6c1',
'lightsalmon': '#ffa07a',
'lightseagreen': '#20b2aa',
'lightskyblue': '#87cefa',
'lightslategray': '#778899',
'lightslategrey': '#778899',
'lightsteelblue': '#b0c4de',
'lightyellow': '#ffffe0',
'lime': '#00ff00',
'limegreen': '#32cd32',
'linen': '#faf0e6',
'magenta': '#ff00ff',
'maroon': '#800000',
'mediumaquamarine': '#66cdaa',
'mediumblue': '#0000cd',
'mediumorchid': '#ba55d3',
'mediumpurple': '#9370db',
'mediumseagreen': '#3cb371',
'mediumslateblue': '#7b68ee',
'mediumspringgreen': '#00fa9a',
'mediumturquoise': '#48d1cc',
'mediumvioletred': '#c71585',
'midnightblue': '#191970',
'mintcream': '#f5fffa',
'mistyrose': '#ffe4e1',
'moccasin': '#ffe4b5',
'navajowhite': '#ffdead',
'navy': '#000080',
'oldlace': '#fdf5e6',
'olive': '#808000',
'olivedrab': '#6b8e23',
'orange': '#ffa500',
'orangered': '#ff4500',
'orchid': '#da70d6',
'palegoldenrod': '#eee8aa',
'palegreen': '#98fb98',
'paleturquoise': '#afeeee',
'palevioletred': '#db7093',
'papayawhip': '#ffefd5',
'peachpuff': '#ffdab9',
'peru': '#cd853f',
'pink': '#ffc0cb',
'plum': '#dda0dd',
'powderblue': '#b0e0e6',
'purple': '#800080',
'red': '#ff0000',
'rosybrown': '#bc8f8f',
'royalblue': '#4169e1',
'saddlebrown': '#8b4513',
'salmon': '#fa8072',
'sandybrown': '#f4a460',
'seagreen': '#2e8b57',
'seashell': '#fff5ee',
'sienna': '#a0522d',
'silver': '#c0c0c0',
'skyblue': '#87ceeb',
'slateblue': '#6a5acd',
'slategray': '#708090',
'slategrey': '#708090',
'snow': '#fffafa',
'springgreen': '#00ff7f',
'steelblue': '#4682b4',
'tan': '#d2b48c',
'teal': '#008080',
'thistle': '#d8bfd8',
'tomato': '#ff6347',
'turquoise': '#40e0d0',
'violet': '#ee82ee',
'wheat': '#f5deb3',
'white': '#ffffff',
'whitesmoke': '#f5f5f5',
'yellow': '#ffff00',
'yellowgreen': '#9acd32',
"aliceblue": "#f0f8ff",
"antiquewhite": "#faebd7",
"aqua": "#00ffff",
"aquamarine": "#7fffd4",
"azure": "#f0ffff",
"beige": "#f5f5dc",
"bisque": "#ffe4c4",
"black": "#000000",
"blanchedalmond": "#ffebcd",
"blue": "#0000ff",
"blueviolet": "#8a2be2",
"brown": "#a52a2a",
"burlywood": "#deb887",
"cadetblue": "#5f9ea0",
"chartreuse": "#7fff00",
"chocolate": "#d2691e",
"coral": "#ff7f50",
"cornflowerblue": "#6495ed",
"cornsilk": "#fff8dc",
"crimson": "#dc143c",
"cyan": "#00ffff",
"darkblue": "#00008b",
"darkcyan": "#008b8b",
"darkgoldenrod": "#b8860b",
"darkgray": "#a9a9a9",
"darkgreen": "#006400",
"darkgrey": "#a9a9a9",
"darkkhaki": "#bdb76b",
"darkmagenta": "#8b008b",
"darkolivegreen": "#556b2f",
"darkorange": "#ff8c00",
"darkorchid": "#9932cc",
"darkred": "#8b0000",
"darksalmon": "#e9967a",
"darkseagreen": "#8fbc8f",
"darkslateblue": "#483d8b",
"darkslategray": "#2f4f4f",
"darkslategrey": "#2f4f4f",
"darkturquoise": "#00ced1",
"darkviolet": "#9400d3",
"deeppink": "#ff1493",
"deepskyblue": "#00bfff",
"dimgray": "#696969",
"dimgrey": "#696969",
"dodgerblue": "#1e90ff",
"firebrick": "#b22222",
"floralwhite": "#fffaf0",
"forestgreen": "#228b22",
"fuchsia": "#ff00ff",
"gainsboro": "#dcdcdc",
"ghostwhite": "#f8f8ff",
"gold": "#ffd700",
"goldenrod": "#daa520",
"gray": "#808080",
"green": "#008000",
"greenyellow": "#adff2f",
"grey": "#808080",
"honeydew": "#f0fff0",
"hotpink": "#ff69b4",
"indianred": "#cd5c5c",
"indigo": "#4b0082",
"ivory": "#fffff0",
"khaki": "#f0e68c",
"lavender": "#e6e6fa",
"lavenderblush": "#fff0f5",
"lawngreen": "#7cfc00",
"lemonchiffon": "#fffacd",
"lightblue": "#add8e6",
"lightcoral": "#f08080",
"lightcyan": "#e0ffff",
"lightgoldenrodyellow": "#fafad2",
"lightgray": "#d3d3d3",
"lightgreen": "#90ee90",
"lightgrey": "#d3d3d3",
"lightpink": "#ffb6c1",
"lightsalmon": "#ffa07a",
"lightseagreen": "#20b2aa",
"lightskyblue": "#87cefa",
"lightslategray": "#778899",
"lightslategrey": "#778899",
"lightsteelblue": "#b0c4de",
"lightyellow": "#ffffe0",
"lime": "#00ff00",
"limegreen": "#32cd32",
"linen": "#faf0e6",
"magenta": "#ff00ff",
"maroon": "#800000",
"mediumaquamarine": "#66cdaa",
"mediumblue": "#0000cd",
"mediumorchid": "#ba55d3",
"mediumpurple": "#9370db",
"mediumseagreen": "#3cb371",
"mediumslateblue": "#7b68ee",
"mediumspringgreen": "#00fa9a",
"mediumturquoise": "#48d1cc",
"mediumvioletred": "#c71585",
"midnightblue": "#191970",
"mintcream": "#f5fffa",
"mistyrose": "#ffe4e1",
"moccasin": "#ffe4b5",
"navajowhite": "#ffdead",
"navy": "#000080",
"oldlace": "#fdf5e6",
"olive": "#808000",
"olivedrab": "#6b8e23",
"orange": "#ffa500",
"orangered": "#ff4500",
"orchid": "#da70d6",
"palegoldenrod": "#eee8aa",
"palegreen": "#98fb98",
"paleturquoise": "#afeeee",
"palevioletred": "#db7093",
"papayawhip": "#ffefd5",
"peachpuff": "#ffdab9",
"peru": "#cd853f",
"pink": "#ffc0cb",
"plum": "#dda0dd",
"powderblue": "#b0e0e6",
"purple": "#800080",
"red": "#ff0000",
"rosybrown": "#bc8f8f",
"royalblue": "#4169e1",
"saddlebrown": "#8b4513",
"salmon": "#fa8072",
"sandybrown": "#f4a460",
"seagreen": "#2e8b57",
"seashell": "#fff5ee",
"sienna": "#a0522d",
"silver": "#c0c0c0",
"skyblue": "#87ceeb",
"slateblue": "#6a5acd",
"slategray": "#708090",
"slategrey": "#708090",
"snow": "#fffafa",
"springgreen": "#00ff7f",
"steelblue": "#4682b4",
"tan": "#d2b48c",
"teal": "#008080",
"thistle": "#d8bfd8",
"tomato": "#ff6347",
"turquoise": "#40e0d0",
"violet": "#ee82ee",
"wheat": "#f5deb3",
"white": "#ffffff",
"whitesmoke": "#f5f5f5",
"yellow": "#ffff00",
"yellowgreen": "#9acd32",
}
@ -158,8 +158,8 @@ def normalize_colour(c):
c = c.strip().lower()
if c in CSS_COLOURS:
return CSS_COLOURS[c]
if re.match(r'^#?[0-9a-f]{3}([0-9a-f]{3})?$', c):
if re.match(r"^#?[0-9a-f]{3}([0-9a-f]{3})?$", c):
if len(c) == 4:
return c[0]+c[1]+c[1]+c[2]+c[2]+c[3]+c[3]
return c[0] + c[1] + c[1] + c[2] + c[2] + c[3] + c[3]
return c
raise ValueError('Unknown colour code: {}'.format(c))
raise ValueError("Unknown colour code: {}".format(c))

View file

@ -1,20 +1,23 @@
import argparse
import shapely.geometry
import shapely.ops
from process_subways import download_cities
from process_subways import DEFAULT_CITIES_INFO_URL, get_cities_info
def make_disjoint_metro_polygons():
cities = download_cities()
def make_disjoint_metro_polygons(cities_info_url: str) -> None:
cities_info = get_cities_info(cities_info_url)
polygons = []
for c in cities:
for ci in cities_info:
bbox = tuple(map(float, ci["bbox"].split(",")))
polygon = shapely.geometry.Polygon(
[
(c.bbox[1], c.bbox[0]),
(c.bbox[1], c.bbox[2]),
(c.bbox[3], c.bbox[2]),
(c.bbox[3], c.bbox[0]),
(bbox[0], bbox[1]),
(bbox[0], bbox[3]),
(bbox[2], bbox[3]),
(bbox[2], bbox[1]),
]
)
polygons.append(polygon)
@ -31,5 +34,19 @@ def make_disjoint_metro_polygons():
print("END")
if __name__ == '__main__':
make_disjoint_metro_polygons()
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument(
"--cities-info-url",
default=DEFAULT_CITIES_INFO_URL,
help=(
"URL of CSV file with reference information about rapid transit "
"networks. file:// protocol is also supported."
),
)
options = parser.parse_args()
make_disjoint_metro_polygons(options.cities_info_url)
if __name__ == "__main__":
main()

View file

@ -1,28 +1,41 @@
import argparse
import json
from process_subways import download_cities
from process_subways import DEFAULT_CITIES_INFO_URL, get_cities_info
if __name__ == '__main__':
if __name__ == "__main__":
arg_parser = argparse.ArgumentParser(
description="""
This script generates a list of good/all network names.
It is used by subway render to generate the list of network at frontend.
It uses two sources: a mapsme.json validator output with good networks, and
a google spreadsheet with networks for the process_subways.download_cities()
function.""",
description=(
"""This script generates a list of good/all network names. It is
used by subway render to generate the list of network at frontend.
It uses two sources: a mapsme.json validator output with good
networks, and a google spreadsheet with networks for the
process_subways.download_cities() function."""
),
formatter_class=argparse.RawTextHelpFormatter,
)
arg_parser.add_argument(
'subway_json_file',
type=argparse.FileType('r'),
help="Validator output defined by -o option of process_subways.py script",
"subway_json_file",
type=argparse.FileType("r"),
help=(
"Validator output defined by -o option "
"of process_subways.py script",
),
)
arg_parser.add_argument(
'--with-bad',
"--cities-info-url",
default=DEFAULT_CITIES_INFO_URL,
help=(
"URL of CSV file with reference information about rapid transit "
"networks. file:// protocol is also supported."
),
)
arg_parser.add_argument(
"--with-bad",
action="store_true",
help="Whether to include cities validation of which was failed",
)
@ -34,16 +47,16 @@ if __name__ == '__main__':
subway_json = json.load(subway_json_file)
good_cities = set(
n.get('network', n.get('title')) for n in subway_json['networks']
n.get("network", n.get("title")) for n in subway_json["networks"]
)
cities = download_cities()
cities_info = get_cities_info(args.cities_info_url)
lines = []
for c in cities:
if c.name in good_cities:
lines.append(f"{c.name}, {c.country}")
for ci in cities_info:
if ci["name"] in good_cities:
lines.append(f"{ci['name']}, {ci['country']}")
elif with_bad:
lines.append(f"{c.name}, {c.country} (Bad)")
lines.append(f"{ci['name']}, {ci['country']} (Bad)")
for line in sorted(lines):
print(line)

View file

@ -1,5 +1,7 @@
#!/usr/bin/env python3
import argparse
import csv
import inspect
import json
import logging
import os
@ -8,7 +10,10 @@ import sys
import time
import urllib.parse
import urllib.request
from processors import processor
from functools import partial
from typing import Dict, List, Optional, Tuple
import processors
from subway_io import (
dump_yaml,
load_xml,
@ -17,8 +22,8 @@ from subway_io import (
write_recovery_data,
)
from subway_structure import (
City,
CriticalValidationError,
download_cities,
find_transfers,
get_unused_entrances_geojson,
MODES_OVERGROUND,
@ -26,31 +31,38 @@ from subway_structure import (
)
DEFAULT_SPREADSHEET_ID = "1SEW1-NiNOnA2qDwievcxYV1FOaQl1mb1fdeyqAxHu3k"
DEFAULT_CITIES_INFO_URL = (
"https://docs.google.com/spreadsheets/d/"
f"{DEFAULT_SPREADSHEET_ID}/export?format=csv"
)
Point = Tuple[float, float]
def overpass_request(overground, overpass_api, bboxes):
query = '[out:json][timeout:1000];('
query = "[out:json][timeout:1000];("
modes = MODES_OVERGROUND if overground else MODES_RAPID
for bbox in bboxes:
bbox_part = '({})'.format(','.join(str(coord) for coord in bbox))
query += '('
bbox_part = "({})".format(",".join(str(coord) for coord in bbox))
query += "("
for mode in modes:
query += 'rel[route="{}"]{};'.format(mode, bbox_part)
query += ');'
query += 'rel(br)[type=route_master];'
query += ");"
query += "rel(br)[type=route_master];"
if not overground:
query += 'node[railway=subway_entrance]{};'.format(bbox_part)
query += 'rel[public_transport=stop_area]{};'.format(bbox_part)
query += "node[railway=subway_entrance]{};".format(bbox_part)
query += "rel[public_transport=stop_area]{};".format(bbox_part)
query += (
'rel(br)[type=public_transport][public_transport=stop_area_group];'
"rel(br)[type=public_transport][public_transport=stop_area_group];"
)
query += ');(._;>>;);out body center qt;'
logging.debug('Query: %s', query)
url = '{}?data={}'.format(overpass_api, urllib.parse.quote(query))
query += ");(._;>>;);out body center qt;"
logging.debug("Query: %s", query)
url = "{}?data={}".format(overpass_api, urllib.parse.quote(query))
response = urllib.request.urlopen(url, timeout=1000)
if response.getcode() != 200:
raise Exception(
'Failed to query Overpass API: HTTP {}'.format(response.getcode())
)
return json.load(response)['elements']
if (r_code := response.getcode()) != 200:
raise Exception(f"Failed to query Overpass API: HTTP {r_code}")
return json.load(response)["elements"]
def multi_overpass(overground, overpass_api, bboxes):
@ -60,16 +72,108 @@ def multi_overpass(overground, overpass_api, bboxes):
for i in range(0, len(bboxes) + SLICE_SIZE - 1, SLICE_SIZE):
if i > 0:
time.sleep(INTERREQUEST_WAIT)
result.extend(
overpass_request(
overground, overpass_api, bboxes[i : i + SLICE_SIZE]
)
)
bboxes_i = bboxes[i : i + SLICE_SIZE] # noqa E203
result.extend(overpass_request(overground, overpass_api, bboxes_i))
return result
def slugify(name):
return re.sub(r'[^a-z0-9_-]+', '', name.lower().replace(' ', '_'))
return re.sub(r"[^a-z0-9_-]+", "", name.lower().replace(" ", "_"))
def get_way_center(
element: dict, node_centers: Dict[int, Point]
) -> Optional[Point]:
"""
:param element: dict describing OSM element
:param node_centers: osm_id => (lat, lon)
:return: tuple with center coordinates, or None
"""
# If elements have been queried via overpass-api with
# 'out center;' clause then ways already have 'center' attribute
if "center" in element:
return element["center"]["lat"], element["center"]["lon"]
if "nodes" not in element:
return None
center = [0, 0]
count = 0
way_nodes = element["nodes"]
way_nodes_len = len(element["nodes"])
for i, nd in enumerate(way_nodes):
if nd not in node_centers:
continue
# Don't count the first node of a closed way twice
if (
i == way_nodes_len - 1
and way_nodes_len > 1
and way_nodes[0] == way_nodes[-1]
):
break
center[0] += node_centers[nd][0]
center[1] += node_centers[nd][1]
count += 1
if count == 0:
return None
element["center"] = {"lat": center[0] / count, "lon": center[1] / count}
return element["center"]["lat"], element["center"]["lon"]
def get_relation_center(
element: dict,
node_centers: Dict[int, Point],
way_centers: Dict[int, Point],
relation_centers: Dict[int, Point],
ignore_unlocalized_child_relations: bool = False,
) -> Optional[Point]:
"""
:param element: dict describing OSM element
:param node_centers: osm_id => (lat, lon)
:param way_centers: osm_id => (lat, lon)
:param relation_centers: osm_id => (lat, lon)
:param ignore_unlocalized_child_relations: if a member that is a relation
has no center, skip it and calculate center based on member nodes,
ways and other, "localized" (with known centers), relations
:return: tuple with center coordinates, or None
"""
# If elements have been queried via overpass-api with
# 'out center;' clause then some relations already have 'center'
# attribute. But this is not the case for relations composed only
# of other relations (e.g., route_master, stop_area_group or
# stop_area with only members that are multipolygons)
if "center" in element:
return element["center"]["lat"], element["center"]["lon"]
center = [0, 0]
count = 0
for m in element.get("members", list()):
m_id = m["ref"]
m_type = m["type"]
if m_type == "relation" and m_id not in relation_centers:
if ignore_unlocalized_child_relations:
continue
else:
# Cannot calculate fair center because the center
# of a child relation is not known yet
return None
member_container = (
node_centers
if m_type == "node"
else way_centers
if m_type == "way"
else relation_centers
)
if m_id in member_container:
center[0] += member_container[m_id][0]
center[1] += member_container[m_id][1]
count += 1
if count == 0:
return None
element["center"] = {"lat": center[0] / count, "lon": center[1] / count}
return element["center"]["lat"], element["center"]["lon"]
def calculate_centers(elements):
@ -77,163 +181,219 @@ def calculate_centers(elements):
except for empty ways or relations.
Relies on nodes-ways-relations order in the elements list.
"""
nodes = {} # id(int) => (lat, lon)
ways = {} # id(int) => (lat, lon)
relations = {} # id(int) => (lat, lon)
empty_relations = set() # ids(int) of relations without members
# or containing only empty relations
nodes: Dict[int, Point] = {} # id => (lat, lon)
ways: Dict[int, Point] = {} # id => (lat, lon)
relations: Dict[int, Point] = {} # id => (lat, lon)
def calculate_way_center(el):
# If element has been queried via overpass-api with 'out center;'
# clause then ways already have 'center' attribute
if 'center' in el:
ways[el['id']] = (el['center']['lat'], el['center']['lon'])
return
center = [0, 0]
count = 0
for nd in el['nodes']:
if nd in nodes:
center[0] += nodes[nd][0]
center[1] += nodes[nd][1]
count += 1
if count > 0:
el['center'] = {'lat': center[0] / count, 'lon': center[1] / count}
ways[el['id']] = (el['center']['lat'], el['center']['lon'])
def calculate_relation_center(el):
# If element has been queried via overpass-api with 'out center;'
# clause then some relations already have 'center' attribute
if 'center' in el:
relations[el['id']] = (el['center']['lat'], el['center']['lon'])
return True
center = [0, 0]
count = 0
for m in el.get('members', []):
if m['type'] == 'relation' and m['ref'] not in relations:
if m['ref'] in empty_relations:
# Ignore empty child relations
continue
else:
# Center of child relation is not known yet
return False
member_container = (
nodes
if m['type'] == 'node'
else ways
if m['type'] == 'way'
else relations
)
if m['ref'] in member_container:
center[0] += member_container[m['ref']][0]
center[1] += member_container[m['ref']][1]
count += 1
if count == 0:
empty_relations.add(el['id'])
else:
el['center'] = {'lat': center[0] / count, 'lon': center[1] / count}
relations[el['id']] = (el['center']['lat'], el['center']['lon'])
return True
relations_without_center = []
unlocalized_relations = [] # 'unlocalized' means the center of the
# relation has not been calculated yet
for el in elements:
if el['type'] == 'node':
nodes[el['id']] = (el['lat'], el['lon'])
elif el['type'] == 'way':
if 'nodes' in el:
calculate_way_center(el)
elif el['type'] == 'relation':
if not calculate_relation_center(el):
relations_without_center.append(el)
if el["type"] == "node":
nodes[el["id"]] = (el["lat"], el["lon"])
elif el["type"] == "way":
if center := get_way_center(el, nodes):
ways[el["id"]] = center
elif el["type"] == "relation":
if center := get_relation_center(el, nodes, ways, relations):
relations[el["id"]] = center
else:
unlocalized_relations.append(el)
def iterate_relation_centers_calculation(
ignore_unlocalized_child_relations: bool,
) -> List[int]:
unlocalized_relations_upd = []
for rel in unlocalized_relations:
if center := get_relation_center(
rel, nodes, ways, relations, ignore_unlocalized_child_relations
):
relations[rel["id"]] = center
else:
unlocalized_relations_upd.append(rel)
return unlocalized_relations_upd
# Calculate centers for relations that have no one yet
while relations_without_center:
new_relations_without_center = []
for rel in relations_without_center:
if not calculate_relation_center(rel):
new_relations_without_center.append(rel)
if len(new_relations_without_center) == len(relations_without_center):
break
relations_without_center = new_relations_without_center
while unlocalized_relations:
unlocalized_relations_upd = iterate_relation_centers_calculation(False)
progress = len(unlocalized_relations_upd) < len(unlocalized_relations)
if not progress:
unlocalized_relations_upd = iterate_relation_centers_calculation(
True
)
progress = len(unlocalized_relations_upd) < len(
unlocalized_relations
)
if not progress:
break
unlocalized_relations = unlocalized_relations_upd
if relations_without_center:
logging.error(
"Cannot calculate center for the relations (%d in total): %s%s",
len(relations_without_center),
', '.join(str(rel['id']) for rel in relations_without_center[:20]),
", ..." if len(relations_without_center) > 20 else "",
)
if empty_relations:
logging.warning(
"Empty relations (%d in total): %s%s",
len(empty_relations),
', '.join(str(x) for x in list(empty_relations)[:20]),
", ..." if len(empty_relations) > 20 else "",
def add_osm_elements_to_cities(osm_elements, cities):
for el in osm_elements:
for c in cities:
if c.contains(el):
c.add(el)
def validate_cities(cities):
"""Validate cities. Return list of good cities."""
good_cities = []
for c in cities:
try:
c.extract_routes()
except CriticalValidationError as e:
logging.error(
"Critical validation error while processing %s: %s",
c.name,
e,
)
c.error(str(e))
except AssertionError as e:
logging.error(
"Validation logic error while processing %s: %s",
c.name,
e,
)
c.error(f"Validation logic error: {e}")
else:
c.validate()
if c.is_good:
good_cities.append(c)
return good_cities
def get_cities_info(
cities_info_url: str = DEFAULT_CITIES_INFO_URL,
) -> List[dict]:
response = urllib.request.urlopen(cities_info_url)
if (
not cities_info_url.startswith("file://")
and (r_code := response.getcode()) != 200
):
raise Exception(
f"Failed to download cities spreadsheet: HTTP {r_code}"
)
data = response.read().decode("utf-8")
reader = csv.DictReader(
data.splitlines(),
fieldnames=(
"id",
"name",
"country",
"continent",
"num_stations",
"num_lines",
"num_light_lines",
"num_interchanges",
"bbox",
"networks",
),
)
cities_info = list()
names = set()
next(reader) # skipping the header
for city_info in reader:
if city_info["id"] and city_info["bbox"]:
cities_info.append(city_info)
name = city_info["name"].strip()
if name in names:
logging.warning(
"Duplicate city name in city list: %s",
city_info,
)
names.add(name)
return cities_info
if __name__ == '__main__':
def prepare_cities(
cities_info_url: str = DEFAULT_CITIES_INFO_URL, overground: bool = False
) -> List[City]:
if overground:
raise NotImplementedError("Overground transit not implemented yet")
cities_info = get_cities_info(cities_info_url)
return list(map(partial(City, overground=overground), cities_info))
def main():
parser = argparse.ArgumentParser()
parser.add_argument(
'-i',
'--source',
help='File to write backup of OSM data, or to read data from',
"--cities-info-url",
default=DEFAULT_CITIES_INFO_URL,
help=(
"URL of CSV file with reference information about rapid transit "
"networks. file:// protocol is also supported."
),
)
parser.add_argument(
'-x', '--xml', help='OSM extract with routes, to read data from'
"-i",
"--source",
help="File to write backup of OSM data, or to read data from",
)
parser.add_argument(
'--overpass-api',
default='http://overpass-api.de/api/interpreter',
"-x", "--xml", help="OSM extract with routes, to read data from"
)
parser.add_argument(
"--overpass-api",
default="http://overpass-api.de/api/interpreter",
help="Overpass API URL",
)
parser.add_argument(
'-q',
'--quiet',
action='store_true',
help='Show only warnings and errors',
"-q",
"--quiet",
action="store_true",
help="Show only warnings and errors",
)
parser.add_argument(
'-c', '--city', help='Validate only a single city or a country'
"-c", "--city", help="Validate only a single city or a country"
)
parser.add_argument(
'-t',
'--overground',
action='store_true',
help='Process overground transport instead of subways',
"-t",
"--overground",
action="store_true",
help="Process overground transport instead of subways",
)
parser.add_argument(
'-e',
'--entrances',
type=argparse.FileType('w', encoding='utf-8'),
help='Export unused subway entrances as GeoJSON here',
"-e",
"--entrances",
type=argparse.FileType("w", encoding="utf-8"),
help="Export unused subway entrances as GeoJSON here",
)
parser.add_argument(
'-l',
'--log',
type=argparse.FileType('w', encoding='utf-8'),
help='Validation JSON file name',
"-l",
"--log",
type=argparse.FileType("w", encoding="utf-8"),
help="Validation JSON file name",
)
for processor_name, processor in inspect.getmembers(
processors, inspect.ismodule
):
if not processor_name.startswith("_"):
parser.add_argument(
f"--output-{processor_name}",
help=(
"Processed metro systems output filename "
f"in {processor_name.upper()} format"
),
)
parser.add_argument("--cache", help="Cache file name for processed data")
parser.add_argument(
"-r", "--recovery-path", help="Cache file name for error recovery"
)
parser.add_argument(
'-o',
'--output',
type=argparse.FileType('w', encoding='utf-8'),
help='Processed metro systems output',
)
parser.add_argument('--cache', help='Cache file name for processed data')
parser.add_argument(
'-r', '--recovery-path', help='Cache file name for error recovery'
"-d", "--dump", help="Make a YAML file for a city data"
)
parser.add_argument(
'-d', '--dump', help='Make a YAML file for a city data'
"-j", "--geojson", help="Make a GeoJSON file for a city data"
)
parser.add_argument(
'-j', '--geojson', help='Make a GeoJSON file for a city data'
)
parser.add_argument(
'--crude',
action='store_true',
help='Do not use OSM railway geometry for GeoJSON',
"--crude",
action="store_true",
help="Do not use OSM railway geometry for GeoJSON",
)
options = parser.parse_args()
@ -243,12 +403,11 @@ if __name__ == '__main__':
log_level = logging.INFO
logging.basicConfig(
level=log_level,
datefmt='%H:%M:%S',
format='%(asctime)s %(levelname)-7s %(message)s',
datefmt="%H:%M:%S",
format="%(asctime)s %(levelname)-7s %(message)s",
)
# Downloading cities from Google Spreadsheets
cities = download_cities(options.overground)
cities = prepare_cities(options.cities_info_url, options.overground)
if options.city:
cities = [
c
@ -256,7 +415,7 @@ if __name__ == '__main__':
if c.name == options.city or c.country == options.city
]
if not cities:
logging.error('No cities to process')
logging.error("No cities to process")
sys.exit(2)
# Augment cities with recovery data
@ -266,83 +425,59 @@ if __name__ == '__main__':
for city in cities:
city.recovery_data = recovery_data.get(city.name, None)
logging.info('Read %s metro networks', len(cities))
logging.info("Read %s metro networks", len(cities))
# Reading cached json, loading XML or querying Overpass API
if options.source and os.path.exists(options.source):
logging.info('Reading %s', options.source)
with open(options.source, 'r') as f:
logging.info("Reading %s", options.source)
with open(options.source, "r") as f:
osm = json.load(f)
if 'elements' in osm:
osm = osm['elements']
if "elements" in osm:
osm = osm["elements"]
calculate_centers(osm)
elif options.xml:
logging.info('Reading %s', options.xml)
logging.info("Reading %s", options.xml)
osm = load_xml(options.xml)
calculate_centers(osm)
if options.source:
with open(options.source, 'w', encoding='utf-8') as f:
with open(options.source, "w", encoding="utf-8") as f:
json.dump(osm, f)
else:
if len(cities) > 10:
logging.error(
'Would not download that many cities from Overpass API, '
'choose a smaller set'
"Would not download that many cities from Overpass API, "
"choose a smaller set"
)
sys.exit(3)
bboxes = [c.bbox for c in cities]
logging.info('Downloading data from Overpass API')
logging.info("Downloading data from Overpass API")
osm = multi_overpass(options.overground, options.overpass_api, bboxes)
calculate_centers(osm)
if options.source:
with open(options.source, 'w', encoding='utf-8') as f:
with open(options.source, "w", encoding="utf-8") as f:
json.dump(osm, f)
logging.info('Downloaded %s elements, sorting by city', len(osm))
logging.info("Downloaded %s elements", len(osm))
# Sorting elements by city and prepare a dict
for el in osm:
for c in cities:
if c.contains(el):
c.add(el)
logging.info("Sorting elements by city")
add_osm_elements_to_cities(osm, cities)
logging.info('Building routes for each city')
good_cities = []
for c in cities:
try:
c.extract_routes()
except CriticalValidationError as e:
logging.error(
"Critical validation error while processing %s: %s",
c.name,
str(e),
)
c.error(str(e))
except AssertionError as e:
logging.error(
"Validation logic error while processing %s: %s",
c.name,
str(e),
)
c.error("Validation logic error: {}".format(str(e)))
else:
c.validate()
if c.is_good():
good_cities.append(c)
logging.info("Building routes for each city")
good_cities = validate_cities(cities)
logging.info('Finding transfer stations')
logging.info("Finding transfer stations")
transfers = find_transfers(osm, cities)
good_city_names = set(c.name for c in good_cities)
logging.info(
'%s good cities: %s',
"%s good cities: %s",
len(good_city_names),
', '.join(sorted(good_city_names)),
", ".join(sorted(good_city_names)),
)
bad_city_names = set(c.name for c in cities) - good_city_names
logging.info(
'%s bad cities: %s',
"%s bad cities: %s",
len(bad_city_names),
', '.join(sorted(bad_city_names)),
", ".join(sorted(bad_city_names)),
)
if options.recovery_path:
@ -355,48 +490,55 @@ if __name__ == '__main__':
if os.path.isdir(options.dump):
for c in cities:
with open(
os.path.join(options.dump, slugify(c.name) + '.yaml'),
'w',
encoding='utf-8',
os.path.join(options.dump, slugify(c.name) + ".yaml"),
"w",
encoding="utf-8",
) as f:
dump_yaml(c, f)
elif len(cities) == 1:
with open(options.dump, 'w', encoding='utf-8') as f:
with open(options.dump, "w", encoding="utf-8") as f:
dump_yaml(cities[0], f)
else:
logging.error('Cannot dump %s cities at once', len(cities))
logging.error("Cannot dump %s cities at once", len(cities))
if options.geojson:
if os.path.isdir(options.geojson):
for c in cities:
with open(
os.path.join(
options.geojson, slugify(c.name) + '.geojson'
options.geojson, slugify(c.name) + ".geojson"
),
'w',
encoding='utf-8',
"w",
encoding="utf-8",
) as f:
json.dump(make_geojson(c, not options.crude), f)
elif len(cities) == 1:
with open(options.geojson, 'w', encoding='utf-8') as f:
with open(options.geojson, "w", encoding="utf-8") as f:
json.dump(make_geojson(cities[0], not options.crude), f)
else:
logging.error(
'Cannot make a geojson of %s cities at once', len(cities)
"Cannot make a geojson of %s cities at once", len(cities)
)
if options.log:
res = []
for c in cities:
v = c.get_validation_result()
v['slug'] = slugify(c.name)
v["slug"] = slugify(c.name)
res.append(v)
json.dump(res, options.log, indent=2, ensure_ascii=False)
if options.output:
json.dump(
processor.process(cities, transfers, options.cache),
options.output,
indent=1,
ensure_ascii=False,
)
for processor_name, processor in inspect.getmembers(
processors, inspect.ismodule
):
option_name = f"output_{processor_name}"
if not getattr(options, option_name, None):
continue
filename = getattr(options, option_name)
processor.process(cities, transfers, filename, options.cache)
if __name__ == "__main__":
main()

View file

@ -1,2 +1,4 @@
# Here you can change the processor
from . import mapsme as processor
# Import only those processors (modules) you want to use.
# Ignore F401 "module imported but unused" violation since these modules
# are addressed via introspection.
from . import mapsme, gtfs # noqa F401

108
processors/_common.py Normal file
View file

@ -0,0 +1,108 @@
from typing import List, Set
from subway_structure import City, el_center, StopArea
DEFAULT_INTERVAL = 2.5 * 60 # seconds
KMPH_TO_MPS = 1 / 3.6 # km/h to m/s conversion multiplier
SPEED_ON_TRANSFER = 3.5 * KMPH_TO_MPS # m/s
TRANSFER_PENALTY = 30 # seconds
def format_colour(colour):
"""Truncate leading # sign."""
return colour[1:] if colour else None
def transit_to_dict(
cities: List[City], transfers: List[Set[StopArea]]
) -> dict:
"""Get data for good cities as a dictionary."""
data = {
"stopareas": {}, # stoparea id => stoparea data
"networks": {}, # city name => city data
"transfers": {}, # set(tuple(stoparea_id1, stoparea_id2)), id1<id2
}
for city in (c for c in cities if c.is_good):
network = {
"id": city.id,
"name": city.name,
"routes": [],
}
for route_master in city:
route_data = {
"id": route_master.id,
"mode": route_master.mode,
"ref": route_master.ref,
"name": route_master.name,
"colour": route_master.colour,
"infill": route_master.infill,
"itineraries": [],
}
for route in route_master:
variant_data = {
"id": route.id,
"tracks": route.get_tracks_geometry(),
"start_time": route.start_time,
"end_time": route.end_time,
"interval": route.interval,
"stops": [
{
"stoparea_id": route_stop.stoparea.id,
"distance": route_stop.distance,
}
for route_stop in route.stops
],
}
# Store stopareas participating in the route
# and that have not been stored yet
for route_stop in route.stops:
stoparea = route_stop.stoparea
if stoparea.id in data["stopareas"]:
continue
stoparea_data = {
"id": stoparea.id,
"center": stoparea.center,
"name": stoparea.station.name,
"entrances": [
{
"id": egress_id,
"name": egress["tags"].get("name"),
"ref": egress["tags"].get("ref"),
"center": el_center(egress),
}
for (egress_id, egress) in (
(egress_id, city.elements[egress_id])
for egress_id in stoparea.entrances
| stoparea.exits
)
],
}
data["stopareas"][stoparea.id] = stoparea_data
route_data["itineraries"].append(variant_data)
network["routes"].append(route_data)
data["networks"][city.name] = network
# transfers
pairwise_transfers = set()
for stoparea_set in transfers:
stoparea_list = list(stoparea_set)
for first_i in range(len(stoparea_list) - 1):
for second_i in range(first_i + 1, len(stoparea_list)):
stoparea1_id = stoparea_list[first_i].id
stoparea2_id = stoparea_list[second_i].id
if all(
st_id in data["stopareas"]
for st_id in (stoparea1_id, stoparea2_id)
):
id1, id2 = sorted([stoparea1_id, stoparea2_id])
pairwise_transfers.add((id1, id2))
data["transfers"] = pairwise_transfers
return data

395
processors/gtfs.py Normal file
View file

@ -0,0 +1,395 @@
import csv
from functools import partial
from io import BytesIO, StringIO
from itertools import permutations
from tarfile import TarFile, TarInfo
from typing import List, Optional, Set
from zipfile import ZipFile
from ._common import (
DEFAULT_INTERVAL,
format_colour,
SPEED_ON_TRANSFER,
TRANSFER_PENALTY,
transit_to_dict,
)
from subway_structure import (
City,
distance,
StopArea,
)
DEFAULT_TRIP_START_TIME = (5, 0) # 05:00
DEFAULT_TRIP_END_TIME = (1, 0) # 01:00
COORDINATE_PRECISION = 7 # fractional digits. It's OSM precision, ~ 5 cm
GTFS_COLUMNS = {
"agency": [
"agency_id",
"agency_name",
"agency_url",
"agency_timezone",
"agency_lang",
"agency_phone",
],
"routes": [
"route_id",
"agency_id",
"route_short_name",
"route_long_name",
"route_desc",
"route_type",
"route_url",
"route_color",
"route_text_color",
"route_sort_order",
"route_fare_class",
"line_id",
"listed_route",
],
"trips": [
"route_id",
"service_id",
"trip_id",
"trip_headsign",
"trip_short_name",
"direction_id",
"block_id",
"shape_id",
"wheelchair_accessible",
"trip_route_type",
"route_pattern_id",
"bikes_allowed",
],
"stops": [
"stop_id",
"stop_code",
"stop_name",
"stop_desc",
"platform_code",
"platform_name",
"stop_lat",
"stop_lon",
"zone_id",
"stop_address",
"stop_url",
"level_id",
"location_type",
"parent_station",
"wheelchair_boarding",
"municipality",
"on_street",
"at_street",
"vehicle_type",
],
"calendar": [
"service_id",
"monday",
"tuesday",
"wednesday",
"thursday",
"friday",
"saturday",
"sunday",
"start_date",
"end_date",
],
"stop_times": [
"trip_id",
"arrival_time",
"departure_time",
"stop_id",
"stop_sequence",
"stop_headsign",
"pickup_type",
"drop_off_type",
"shape_dist_traveled",
"timepoint",
"checkpoint_id",
"continuous_pickup",
"continuous_drop_off",
],
"frequencies": [
"trip_id",
"start_time",
"end_time",
"headway_secs",
"exact_times",
],
"shapes": [
"shape_id",
"shape_pt_lat",
"shape_pt_lon",
"shape_pt_sequence",
"shape_dist_traveled",
],
"transfers": [
"from_stop_id",
"to_stop_id",
"transfer_type",
"min_transfer_time",
],
}
def round_coords(coords_tuple):
return tuple(
map(lambda coord: round(coord, COORDINATE_PRECISION), coords_tuple)
)
def transit_data_to_gtfs(data):
# Keys correspond GTFS file names
gtfs_data = {key: [] for key in GTFS_COLUMNS.keys()}
# GTFS calendar
gtfs_data["calendar"].append(
{
"service_id": "always",
"monday": 1,
"tuesday": 1,
"wednesday": 1,
"thursday": 1,
"friday": 1,
"saturday": 1,
"sunday": 1,
"start_date": "19700101",
"end_date": "30000101",
}
)
# GTFS stops
for stoparea_id, stoparea_data in data["stopareas"].items():
station_id = f"{stoparea_id}_st"
station_name = stoparea_data["name"]
station_center = round_coords(stoparea_data["center"])
station_gtfs = {
"stop_id": station_id,
"stop_code": station_id,
"stop_name": station_name,
"stop_lat": station_center[1],
"stop_lon": station_center[0],
"location_type": 1, # station in GTFS terms
}
gtfs_data["stops"].append(station_gtfs)
platform_id = f"{stoparea_id}_plt"
platform_gtfs = {
"stop_id": platform_id,
"stop_code": platform_id,
"stop_name": station_name,
"stop_lat": station_center[1],
"stop_lon": station_center[0],
"location_type": 0, # stop/platform in GTFS terms
"parent_station": station_id,
}
gtfs_data["stops"].append(platform_gtfs)
if not stoparea_data["entrances"]:
entrance_id = f"{stoparea_id}_egress"
entrance_gtfs = {
"stop_id": entrance_id,
"stop_code": entrance_id,
"stop_name": station_name,
"stop_lat": station_center[1],
"stop_lon": station_center[0],
"location_type": 2,
"parent_station": station_id,
}
gtfs_data["stops"].append(entrance_gtfs)
else:
for entrance in stoparea_data["entrances"]:
entrance_id = f"{entrance['id']}_{stoparea_id}"
entrance_name = entrance["name"]
if not entrance["name"]:
entrance_name = station_name
ref = entrance["ref"]
if ref:
entrance_name += f" {ref}"
center = round_coords(entrance["center"])
entrance_gtfs = {
"stop_id": entrance_id,
"stop_code": entrance_id,
"stop_name": entrance_name,
"stop_lat": center[1],
"stop_lon": center[0],
"location_type": 2,
"parent_station": station_id,
}
gtfs_data["stops"].append(entrance_gtfs)
# agency, routes, trips, stop_times, frequencies, shapes
for network in data["networks"].values():
agency = {
"agency_id": network["id"],
"agency_name": network["name"],
}
gtfs_data["agency"].append(agency)
for route_master in network["routes"]:
route = {
"route_id": route_master["id"],
"agency_id": network["id"],
"route_type": 12 if route_master["mode"] == "monorail" else 1,
"route_short_name": route_master["ref"],
"route_long_name": route_master["name"],
"route_color": format_colour(route_master["colour"]),
}
gtfs_data["routes"].append(route)
for itinerary in route_master["itineraries"]:
shape_id = itinerary["id"][1:] # truncate leading 'r'
trip = {
"trip_id": itinerary["id"],
"route_id": route_master["id"],
"service_id": "always",
"shape_id": shape_id,
}
gtfs_data["trips"].append(trip)
for i, (lon, lat) in enumerate(itinerary["tracks"]):
lon, lat = round_coords((lon, lat))
gtfs_data["shapes"].append(
{
"shape_id": shape_id,
"trip_id": itinerary["id"],
"shape_pt_lat": lat,
"shape_pt_lon": lon,
"shape_pt_sequence": i,
}
)
start_time = itinerary["start_time"] or DEFAULT_TRIP_START_TIME
end_time = itinerary["end_time"] or DEFAULT_TRIP_END_TIME
if end_time <= start_time:
end_time = (end_time[0] + 24, end_time[1])
start_time = f"{start_time[0]:02d}:{start_time[1]:02d}:00"
end_time = f"{end_time[0]:02d}:{end_time[1]:02d}:00"
gtfs_data["frequencies"].append(
{
"trip_id": itinerary["id"],
"start_time": start_time,
"end_time": end_time,
"headway_secs": itinerary["interval"]
or DEFAULT_INTERVAL,
}
)
for i, route_stop in enumerate(itinerary["stops"]):
platform_id = f"{route_stop['stoparea_id']}_plt"
gtfs_data["stop_times"].append(
{
"trip_id": itinerary["id"],
"stop_sequence": i,
"shape_dist_traveled": route_stop["distance"],
"stop_id": platform_id,
}
)
# transfers
for stoparea1_id, stoparea2_id in data["transfers"]:
stoparea1 = data["stopareas"][stoparea1_id]
stoparea2 = data["stopareas"][stoparea2_id]
transfer_time = TRANSFER_PENALTY + round(
distance(stoparea1["center"], stoparea2["center"])
/ SPEED_ON_TRANSFER
)
gtfs_sa_id1 = f"{stoparea1['id']}_st"
gtfs_sa_id2 = f"{stoparea2['id']}_st"
for id1, id2 in permutations((gtfs_sa_id1, gtfs_sa_id2)):
gtfs_data["transfers"].append(
{
"from_stop_id": id1,
"to_stop_id": id2,
"transfer_type": 0,
"min_transfer_time": transfer_time,
}
)
return gtfs_data
def process(
cities: List[City],
transfers: List[Set[StopArea]],
filename: str,
cache_path: str,
):
"""Generate all output and save to file.
:param cities: List of City instances
:param transfers: List of sets of StopArea.id
:param filename: Path to file to save the result
:param cache_path: Path to json-file with good cities cache or None.
"""
transit_data = transit_to_dict(cities, transfers)
gtfs_data = transit_data_to_gtfs(transit_data)
# TODO: make universal cache for all processors,
# and apply the cache to GTFS
make_gtfs(filename, gtfs_data)
def dict_to_row(dict_data: dict, record_type: str) -> list:
"""Given object stored in a dict and an array of columns,
return a row to use in CSV.
"""
return [
"" if (v := dict_data.get(column)) is None else v
for column in GTFS_COLUMNS[record_type]
]
def make_gtfs(
filename: str, gtfs_data: dict, fmt: Optional[str] = None
) -> None:
if not fmt:
fmt = "tar" if filename.endswith(".tar") else "zip"
if fmt == "zip":
make_gtfs_zip(filename, gtfs_data)
else:
make_gtfs_tar(filename, gtfs_data)
def make_gtfs_zip(filename: str, gtfs_data: dict) -> None:
if not filename.lower().endswith(".zip"):
filename = f"{filename}.zip"
with ZipFile(filename, "w") as zf:
for gtfs_feature, columns in GTFS_COLUMNS.items():
with StringIO(newline="") as string_io:
writer = csv.writer(string_io, delimiter=",")
writer.writerow(columns)
writer.writerows(
map(
partial(dict_to_row, record_type=gtfs_feature),
gtfs_data[gtfs_feature],
)
)
zf.writestr(f"{gtfs_feature}.txt", string_io.getvalue())
def make_gtfs_tar(filename: str, gtfs_data: dict) -> None:
if not filename.lower().endswith(".tar"):
filename = f"{filename}.tar"
with TarFile(filename, "w") as tf:
for gtfs_feature, columns in GTFS_COLUMNS.items():
with StringIO(newline="") as string_io:
writer = csv.writer(string_io, delimiter=",")
writer.writerow(columns)
writer.writerows(
map(
partial(dict_to_row, record_type=gtfs_feature),
gtfs_data[gtfs_feature],
)
)
tarinfo = TarInfo(f"{gtfs_feature}.txt")
data = string_io.getvalue().encode()
tarinfo.size = len(data)
tf.addfile(tarinfo, BytesIO(data))

View file

@ -1,23 +1,27 @@
import json
import os
import logging
import os
from collections import defaultdict
from subway_structure import (
DISPLACEMENT_TOLERANCE,
distance,
el_center,
Station,
DISPLACEMENT_TOLERANCE,
)
from ._common import (
DEFAULT_INTERVAL,
format_colour,
KMPH_TO_MPS,
SPEED_ON_TRANSFER,
TRANSFER_PENALTY,
)
OSM_TYPES = {'n': (0, 'node'), 'w': (2, 'way'), 'r': (3, 'relation')}
OSM_TYPES = {"n": (0, "node"), "w": (2, "way"), "r": (3, "relation")}
ENTRANCE_PENALTY = 60 # seconds
TRANSFER_PENALTY = 30 # seconds
KMPH_TO_MPS = 1 / 3.6 # km/h to m/s conversion multiplier
SPEED_TO_ENTRANCE = 5 * KMPH_TO_MPS # m/s
SPEED_ON_TRANSFER = 3.5 * KMPH_TO_MPS # m/s
SPEED_ON_LINE = 40 * KMPH_TO_MPS # m/s
DEFAULT_INTERVAL = 2.5 # minutes
def uid(elid, typ=None):
@ -26,7 +30,7 @@ def uid(elid, typ=None):
if not typ:
osm_id = (osm_id << 2) + OSM_TYPES[t][0]
elif typ != t:
raise Exception('Got {}, expected {}'.format(elid, typ))
raise Exception("Got {}, expected {}".format(elid, typ))
return osm_id << 1
@ -61,7 +65,8 @@ def if_object_is_used(method):
class MapsmeCache:
def __init__(self, cache_path, cities):
if not cache_path:
# cache is not used, all actions with cache must be silently skipped
# Cache is not used,
# all actions with cache must be silently skipped
self.is_used = False
return
self.cache_path = cache_path
@ -69,7 +74,7 @@ class MapsmeCache:
self.cache = {}
if os.path.exists(cache_path):
try:
with open(cache_path, 'r', encoding='utf-8') as f:
with open(cache_path, "r", encoding="utf-8") as f:
self.cache = json.load(f)
except json.decoder.JSONDecodeError:
logging.warning(
@ -81,16 +86,16 @@ class MapsmeCache:
# One stoparea may participate in routes of different cities
self.stop_cities = defaultdict(set) # stoparea id -> city names
self.city_dict = {c.name: c for c in cities}
self.good_city_names = {c.name for c in cities if c.is_good()}
self.good_city_names = {c.name for c in cities if c.is_good}
def _is_cached_city_usable(self, city):
"""Check if cached stations still exist in osm data and
not moved far away.
"""
city_cache_data = self.cache[city.name]
for stoparea_id, cached_stoparea in city_cache_data['stops'].items():
station_id = cached_stoparea['osm_type'][0] + str(
cached_stoparea['osm_id']
for stoparea_id, cached_stoparea in city_cache_data["stops"].items():
station_id = cached_stoparea["osm_type"][0] + str(
cached_stoparea["osm_id"]
)
city_station = city.elements.get(station_id)
if not city_station or not Station.is_station(
@ -99,7 +104,7 @@ class MapsmeCache:
return False
station_coords = el_center(city_station)
cached_station_coords = tuple(
cached_stoparea[coord] for coord in ('lon', 'lat')
cached_stoparea[coord] for coord in ("lon", "lat")
)
displacement = distance(station_coords, cached_station_coords)
if displacement > DISPLACEMENT_TOLERANCE:
@ -112,11 +117,11 @@ class MapsmeCache:
"""Put stops and networks for bad cities into containers
passed as arguments."""
for city in self.city_dict.values():
if not city.is_good() and city.name in self.cache:
if not city.is_good and city.name in self.cache:
city_cached_data = self.cache[city.name]
if self._is_cached_city_usable(city):
stops.update(city_cached_data['stops'])
networks.append(city_cached_data['network'])
stops.update(city_cached_data["stops"])
networks.append(city_cached_data["network"])
logging.info("Taking %s from cache", city.name)
self.recovered_city_names.add(city.name)
@ -125,7 +130,7 @@ class MapsmeCache:
"""Add transfers from usable cached cities to 'transfers' dict
passed as argument."""
for city_name in self.recovered_city_names:
city_cached_transfers = self.cache[city_name]['transfers']
city_cached_transfers = self.cache[city_name]["transfers"]
for stop1_uid, stop2_uid, transfer_time in city_cached_transfers:
if (stop1_uid, stop2_uid) not in transfers:
transfers[(stop1_uid, stop2_uid)] = transfer_time
@ -135,9 +140,10 @@ class MapsmeCache:
"""Create/replace one cache element with new data container.
This should be done for each good city."""
self.cache[city_name] = {
'network': network,
'stops': {}, # stoparea el_id -> jsonified stop data
'transfers': [], # list of tuples (stoparea1_uid, stoparea2_uid, time); uid1 < uid2
"network": network,
"stops": {}, # stoparea el_id -> jsonified stop data
"transfers": [], # list of tuples
# (stoparea1_uid, stoparea2_uid, time); uid1 < uid2
}
@if_object_is_used
@ -151,7 +157,7 @@ class MapsmeCache:
"""Add stoparea to the cache of each city the stoparea is in."""
stoparea_uid = uid(stoparea_id)
for city_name in self.stop_cities[stoparea_uid]:
self.cache[city_name]['stops'][stoparea_id] = st
self.cache[city_name]["stops"][stoparea_id] = st
@if_object_is_used
def add_transfer(self, stoparea1_uid, stoparea2_uid, transfer_time):
@ -161,40 +167,39 @@ class MapsmeCache:
& self.stop_cities[stoparea1_uid]
& self.stop_cities[stoparea2_uid]
):
self.cache[city_name]['transfers'].append(
self.cache[city_name]["transfers"].append(
(stoparea1_uid, stoparea2_uid, transfer_time)
)
@if_object_is_used
def save(self):
try:
with open(self.cache_path, 'w', encoding='utf-8') as f:
with open(self.cache_path, "w", encoding="utf-8") as f:
json.dump(self.cache, f, ensure_ascii=False)
except Exception as e:
logging.warning("Failed to save cache: %s", str(e))
def process(cities, transfers, cache_path):
"""cities - list of City instances;
transfers - list of sets of StopArea.id;
cache_path - path to json-file with good cities cache or None.
def process(cities, transfers, filename, cache_path):
"""Generate all output and save to file.
:param cities: List of City instances
:param transfers: List of sets of StopArea.id
:param filename: Path to file to save the result
:param cache_path: Path to json-file with good cities cache or None.
"""
def format_colour(c):
return c[1:] if c else None
def find_exits_for_platform(center, nodes):
exits = []
min_distance = None
for n in nodes:
d = distance(center, (n['lon'], n['lat']))
d = distance(center, (n["lon"], n["lat"]))
if not min_distance:
min_distance = d * 2 / 3
elif d < min_distance:
continue
too_close = False
for e in exits:
d = distance((e['lon'], e['lat']), (n['lon'], n['lat']))
d = distance((e["lon"], e["lat"]), (n["lon"], n["lat"]))
if d < min_distance:
too_close = True
break
@ -207,25 +212,25 @@ def process(cities, transfers, cache_path):
stop_areas = {} # stoparea el_id -> StopArea instance
stops = {} # stoparea el_id -> stop jsonified data
networks = []
good_cities = [c for c in cities if c.is_good()]
good_cities = [c for c in cities if c.is_good]
platform_nodes = {}
cache.provide_stops_and_networks(stops, networks)
for city in good_cities:
network = {'network': city.name, 'routes': [], 'agency_id': city.id}
network = {"network": city.name, "routes": [], "agency_id": city.id}
cache.initialize_good_city(city.name, network)
for route in city:
routes = {
'type': route.mode,
'ref': route.ref,
'name': route.name,
'colour': format_colour(route.colour),
'route_id': uid(route.id, 'r'),
'itineraries': [],
"type": route.mode,
"ref": route.ref,
"name": route.name,
"colour": format_colour(route.colour),
"route_id": uid(route.id, "r"),
"itineraries": [],
}
if route.infill:
routes['casing'] = routes['colour']
routes['colour'] = format_colour(route.infill)
routes["casing"] = routes["colour"]
routes["colour"] = format_colour(route.infill)
for i, variant in enumerate(route):
itin = []
for stop in variant:
@ -237,41 +242,42 @@ def process(cities, transfers, cache_path):
round(stop.distance / SPEED_ON_LINE),
]
)
# Make exits from platform nodes, if we don't have proper exits
# Make exits from platform nodes,
# if we don't have proper exits
if (
len(stop.stoparea.entrances) + len(stop.stoparea.exits)
== 0
):
for pl in stop.stoparea.platforms:
pl_el = city.elements[pl]
if pl_el['type'] == 'node':
if pl_el["type"] == "node":
pl_nodes = [pl_el]
elif pl_el['type'] == 'way':
elif pl_el["type"] == "way":
pl_nodes = [
city.elements.get('n{}'.format(n))
for n in pl_el['nodes']
city.elements.get("n{}".format(n))
for n in pl_el["nodes"]
]
else:
pl_nodes = []
for m in pl_el['members']:
if m['type'] == 'way':
for m in pl_el["members"]:
if m["type"] == "way":
if (
'{}{}'.format(
m['type'][0], m['ref']
"{}{}".format(
m["type"][0], m["ref"]
)
in city.elements
):
pl_nodes.extend(
[
city.elements.get(
'n{}'.format(n)
"n{}".format(n)
)
for n in city.elements[
'{}{}'.format(
m['type'][0],
m['ref'],
"{}{}".format(
m["type"][0],
m["ref"],
)
]['nodes']
]["nodes"]
]
)
pl_nodes = [n for n in pl_nodes if n]
@ -279,39 +285,39 @@ def process(cities, transfers, cache_path):
stop.stoparea.centers[pl], pl_nodes
)
routes['itineraries'].append(
routes["itineraries"].append(
{
'stops': itin,
'interval': round(
(variant.interval or DEFAULT_INTERVAL) * 60
"stops": itin,
"interval": round(
variant.interval or DEFAULT_INTERVAL
),
}
)
network['routes'].append(routes)
network["routes"].append(routes)
networks.append(network)
for stop_id, stop in stop_areas.items():
st = {
'name': stop.name,
'int_name': stop.int_name,
'lat': stop.center[1],
'lon': stop.center[0],
'osm_type': OSM_TYPES[stop.station.id[0]][1],
'osm_id': int(stop.station.id[1:]),
'id': uid(stop.id),
'entrances': [],
'exits': [],
"name": stop.name,
"int_name": stop.int_name,
"lat": stop.center[1],
"lon": stop.center[0],
"osm_type": OSM_TYPES[stop.station.id[0]][1],
"osm_id": int(stop.station.id[1:]),
"id": uid(stop.id),
"entrances": [],
"exits": [],
}
for e_l, k in ((stop.entrances, 'entrances'), (stop.exits, 'exits')):
for e_l, k in ((stop.entrances, "entrances"), (stop.exits, "exits")):
for e in e_l:
if e[0] == 'n':
if e[0] == "n":
st[k].append(
{
'osm_type': 'node',
'osm_id': int(e[1:]),
'lon': stop.centers[e][0],
'lat': stop.centers[e][1],
'distance': ENTRANCE_PENALTY
"osm_type": "node",
"osm_id": int(e[1:]),
"lon": stop.centers[e][0],
"lat": stop.centers[e][1],
"distance": ENTRANCE_PENALTY
+ round(
distance(stop.centers[e], stop.center)
/ SPEED_TO_ENTRANCE
@ -322,31 +328,31 @@ def process(cities, transfers, cache_path):
if stop.platforms:
for pl in stop.platforms:
for n in platform_nodes[pl]:
for k in ('entrances', 'exits'):
for k in ("entrances", "exits"):
st[k].append(
{
'osm_type': n['type'],
'osm_id': n['id'],
'lon': n['lon'],
'lat': n['lat'],
'distance': ENTRANCE_PENALTY
"osm_type": n["type"],
"osm_id": n["id"],
"lon": n["lon"],
"lat": n["lat"],
"distance": ENTRANCE_PENALTY
+ round(
distance(
(n['lon'], n['lat']), stop.center
(n["lon"], n["lat"]), stop.center
)
/ SPEED_TO_ENTRANCE
),
}
)
else:
for k in ('entrances', 'exits'):
for k in ("entrances", "exits"):
st[k].append(
{
'osm_type': OSM_TYPES[stop.station.id[0]][1],
'osm_id': int(stop.station.id[1:]),
'lon': stop.centers[stop.id][0],
'lat': stop.centers[stop.id][1],
'distance': 60,
"osm_type": OSM_TYPES[stop.station.id[0]][1],
"osm_id": int(stop.station.id[1:]),
"lon": stop.centers[stop.id][0],
"lat": stop.centers[stop.id][1],
"distance": 60,
}
)
@ -382,8 +388,18 @@ def process(cities, transfers, cache_path):
]
result = {
'stops': list(stops.values()),
'transfers': pairwise_transfers,
'networks': networks,
"stops": list(stops.values()),
"transfers": pairwise_transfers,
"networks": networks,
}
return result
if not filename.lower().endswith("json"):
filename = f"{filename}.json"
with open(filename, "w", encoding="utf-8") as f:
json.dump(
result,
f,
indent=1,
ensure_ascii=False,
)

View file

@ -1,10 +1 @@
Flask==2.0.1
kdtree==0.16
lxml==4.6.3
Shapely==1.7.1
## The following requirements were added by pip freeze:
click==8.0.1
itsdangerous==2.0.1
Jinja2==3.0.1
MarkupSafe==2.0.1
Werkzeug==2.0.1
lxml==4.9.2

View file

@ -32,14 +32,16 @@ Environment variable reference:
- PLANET_METRO: path to a local o5m file with extract of cities having metro
It's used instead of \$PLANET if exists otherwise it's created first
- PLANET_UPDATE_SERVER: server to get replication data from. Defaults to https://planet.openstreetmap.org/replication/
- CITIES_INFO_URL: http(s) or "file://" URL to a CSV file with reference information about rapid transit systems. A default value is hammered into python code.
- CITY: name of a city/country to process
- BBOX: bounding box of an extract; x1,y1,x2,y2. Has precedence over \$POLY
- POLY: *.poly file with [multi]polygon comprising cities with metro
If neither \$BBOX nor \$POLY is set, then \$POLY is generated
- SKIP_PLANET_UPDATE: skip \$PLANET file update. Any non-empty string is True
- SKIP_PLANET_UPDATE: skip \$PLANET_METRO file update. Any non-empty string is True
- SKIP_FILTERING: skip filtering railway data. Any non-empty string is True
- FILTERED_DATA: path to filtered data. Defaults to \$TMPDIR/subways.osm
- MAPSME: file name for maps.me json output
- GTFS: file name for GTFS output
- DUMP: directory/file name to dump YAML city data. Do not set to omit dump
- GEOJSON: directory/file name to dump GeoJSON data. Do not set to omit dump
- ELEMENTS_CACHE: file name to elements cache. Allows OSM xml processing phase
@ -54,6 +56,7 @@ Environment variable reference:
- SERVER: server name and path to upload HTML files (e.g. ilya@osmz.ru:/var/www/)
- SERVER_KEY: rsa key to supply for uploading the files
- REMOVE_HTML: set to 1 to remove \$HTML_DIR after uploading
- QUIET: set to any non-empty value to use WARNING log level in process_subways.py. Default is INFO.
EOF
exit
fi
@ -88,9 +91,10 @@ function check_poly() {
if [ -z "${POLY-}" -o ! -f "${POLY-}" ]; then
POLY=${POLY:-$(mktemp "$TMPDIR/all-metro.XXXXXXXX.poly")}
if [ -n "$("$PYTHON" -c "import shapely" 2>&1)" ]; then
"$PYTHON" -m pip install shapely
"$PYTHON" -m pip install shapely==1.7.1
fi
"$PYTHON" "$SUBWAYS_PATH"/make_all_metro_poly.py > "$POLY"
"$PYTHON" "$SUBWAYS_PATH"/make_all_metro_poly.py \
${CITIES_INFO_URL:+--cities-info-url "$CITIES_INFO_URL"} > "$POLY"
fi
fi
POLY_CHECKED=1
@ -235,10 +239,16 @@ if [ -n "${DUMP-}" ]; then
mkdir -p "$DUMP"
fi
if [ -n "${DUMP-}" ]; then
mkdir -p "$DUMP"
fi
VALIDATION="$TMPDIR/validation.json"
"$PYTHON" "$SUBWAYS_PATH/process_subways.py" -q \
"$PYTHON" "$SUBWAYS_PATH/process_subways.py" ${QUIET:+-q} \
-x "$FILTERED_DATA" -l "$VALIDATION" \
${MAPSME:+-o "$MAPSME"} \
${CITIES_INFO_URL:+--cities-info-url "$CITIES_INFO_URL"} \
${MAPSME:+--output-mapsme "$MAPSME"} \
${GTFS:+--output-gtfs "$GTFS"} \
${CITY:+-c "$CITY"} ${DUMP:+-d "$DUMP"} ${GEOJSON:+-j "$GEOJSON"} \
${ELEMENTS_CACHE:+-i "$ELEMENTS_CACHE"} \
${CITY_CACHE:+--cache "$CITY_CACHE"} \
@ -257,7 +267,9 @@ fi
mkdir -p $HTML_DIR
rm -f "$HTML_DIR"/*.html
"$PYTHON" "$SUBWAYS_PATH/validation_to_html.py" "$VALIDATION" "$HTML_DIR"
"$PYTHON" "$SUBWAYS_PATH/validation_to_html.py" \
${CITIES_INFO_URL:+--cities-info-url "$CITIES_INFO_URL"} \
"$VALIDATION" "$HTML_DIR"
# Uploading files to the server

View file

@ -1,14 +1,15 @@
#!/usr/bin/env python3
import json
import codecs
from lxml import etree
import sys
import kdtree
import json
import math
import re
import sys
import urllib.parse
import urllib.request
import kdtree
from lxml import etree
QUERY = """
[out:json][timeout:250][bbox:{{bbox}}];
@ -32,17 +33,17 @@ out meta center qt;
def el_id(el):
return el['type'][0] + str(el.get('id', el.get('ref', '')))
return el["type"][0] + str(el.get("id", el.get("ref", "")))
class StationWrapper:
def __init__(self, st):
if 'center' in st:
self.coords = (st['center']['lon'], st['center']['lat'])
elif 'lon' in st:
self.coords = (st['lon'], st['lat'])
if "center" in st:
self.coords = (st["center"]["lon"], st["center"]["lat"])
elif "lon" in st:
self.coords = (st["lon"], st["lat"])
else:
raise Exception('Coordinates not found for station {}'.format(st))
raise Exception("Coordinates not found for station {}".format(st))
self.station = st
def __len__(self):
@ -53,85 +54,85 @@ class StationWrapper:
def distance(self, other):
"""Calculate distance in meters."""
dx = math.radians(self[0] - other['lon']) * math.cos(
0.5 * math.radians(self[1] + other['lat'])
dx = math.radians(self[0] - other["lon"]) * math.cos(
0.5 * math.radians(self[1] + other["lat"])
)
dy = math.radians(self[1] - other['lat'])
dy = math.radians(self[1] - other["lat"])
return 6378137 * math.sqrt(dx * dx + dy * dy)
def overpass_request(bbox):
url = 'http://overpass-api.de/api/interpreter?data={}'.format(
urllib.parse.quote(QUERY.replace('{{bbox}}', bbox))
url = "http://overpass-api.de/api/interpreter?data={}".format(
urllib.parse.quote(QUERY.replace("{{bbox}}", bbox))
)
response = urllib.request.urlopen(url, timeout=1000)
if response.getcode() != 200:
raise Exception(
'Failed to query Overpass API: HTTP {}'.format(response.getcode())
"Failed to query Overpass API: HTTP {}".format(response.getcode())
)
reader = codecs.getreader('utf-8')
return json.load(reader(response))['elements']
reader = codecs.getreader("utf-8")
return json.load(reader(response))["elements"]
def add_stop_areas(src):
if not src:
raise Exception('Empty dataset provided to add_stop_areas')
raise Exception("Empty dataset provided to add_stop_areas")
# Add station=* tags to stations in subway and light_rail routes
stations = {}
for el in src:
if 'tags' in el and el['tags'].get('railway', None) == 'station':
if "tags" in el and el["tags"].get("railway", None) == "station":
stations[el_id(el)] = el
for el in src:
if (
el['type'] == 'relation'
and 'tags' in el
and el['tags'].get('route', None) in ('subway', 'light_rail')
el["type"] == "relation"
and "tags" in el
and el["tags"].get("route", None) in ("subway", "light_rail")
):
for m in el['members']:
for m in el["members"]:
st = stations.get(el_id(m), None)
if st and 'station' not in st['tags']:
st['tags']['station'] = el['tags']['route']
st['modified'] = True
if st and "station" not in st["tags"]:
st["tags"]["station"] = el["tags"]["route"]
st["modified"] = True
# Create a kd-tree out of subway stations
stations = kdtree.create(dimensions=2)
for el in src:
if 'tags' in el and el['tags'].get('station', None) in (
'subway',
'light_rail',
if "tags" in el and el["tags"].get("station", None) in (
"subway",
"light_rail",
):
stations.add(StationWrapper(el))
if stations.is_leaf:
raise Exception('No stations found')
raise Exception("No stations found")
# Populate a list of nearby subway exits and platforms for each station
MAX_DISTANCE = 300 # meters
stop_areas = {}
for el in src:
if 'tags' not in el:
if "tags" not in el:
continue
if 'station' in el['tags']:
if "station" in el["tags"]:
continue
if el['tags'].get('railway', None) not in (
'subway_entrance',
'platform',
) and el['tags'].get('public_transport', None) not in (
'platform',
'stop_position',
if el["tags"].get("railway", None) not in (
"subway_entrance",
"platform",
) and el["tags"].get("public_transport", None) not in (
"platform",
"stop_position",
):
continue
coords = el.get('center', el)
station = stations.search_nn((coords['lon'], coords['lat']))[0].data
coords = el.get("center", el)
station = stations.search_nn((coords["lon"], coords["lat"]))[0].data
if station.distance(coords) < MAX_DISTANCE:
k = (
station.station['id'],
station.station['tags'].get('name', 'station_with_no_name'),
station.station["id"],
station.station["tags"].get("name", "station_with_no_name"),
)
# Disregard exits and platforms that are differently named
if el['tags'].get('name', k[1]) == k[1]:
if el["tags"].get("name", k[1]) == k[1]:
if k not in stop_areas:
stop_areas[k] = {el_id(station.station): station.station}
stop_areas[k][el_id(el)] = el
@ -139,11 +140,11 @@ def add_stop_areas(src):
# Find existing stop_area relations for stations and remove these stations
for el in src:
if (
el['type'] == 'relation'
and el['tags'].get('public_transport', None) == 'stop_area'
el["type"] == "relation"
and el["tags"].get("public_transport", None) == "stop_area"
):
found = False
for m in el['members']:
for m in el["members"]:
if found:
break
for st in stop_areas:
@ -153,89 +154,90 @@ def add_stop_areas(src):
break
# Create OSM XML for new stop_area relations
root = etree.Element('osm', version='0.6')
root = etree.Element("osm", version="0.6")
rid = -1
for st, members in stop_areas.items():
rel = etree.SubElement(root, 'relation', id=str(rid))
rel = etree.SubElement(root, "relation", id=str(rid))
rid -= 1
etree.SubElement(rel, 'tag', k='type', v='public_transport')
etree.SubElement(rel, 'tag', k='public_transport', v='stop_area')
etree.SubElement(rel, 'tag', k='name', v=st[1])
etree.SubElement(rel, "tag", k="type", v="public_transport")
etree.SubElement(rel, "tag", k="public_transport", v="stop_area")
etree.SubElement(rel, "tag", k="name", v=st[1])
for m in members.values():
if (
m['tags'].get(
'railway', m['tags'].get('public_transport', None)
m["tags"].get(
"railway", m["tags"].get("public_transport", None)
)
== 'platform'
== "platform"
):
role = 'platform'
elif m['tags'].get('public_transport', None) == 'stop_position':
role = 'stop'
role = "platform"
elif m["tags"].get("public_transport", None) == "stop_position":
role = "stop"
else:
role = ''
role = ""
etree.SubElement(
rel, 'member', ref=str(m['id']), type=m['type'], role=role
rel, "member", ref=str(m["id"]), type=m["type"], role=role
)
# Add all downloaded elements
for el in src:
obj = etree.SubElement(root, el['type'])
obj = etree.SubElement(root, el["type"])
for a in (
'id',
'type',
'user',
'uid',
'version',
'changeset',
'timestamp',
'lat',
'lon',
"id",
"type",
"user",
"uid",
"version",
"changeset",
"timestamp",
"lat",
"lon",
):
if a in el:
obj.set(a, str(el[a]))
if 'modified' in el:
obj.set('action', 'modify')
if 'tags' in el:
for k, v in el['tags'].items():
etree.SubElement(obj, 'tag', k=k, v=v)
if 'members' in el:
for m in el['members']:
if "modified" in el:
obj.set("action", "modify")
if "tags" in el:
for k, v in el["tags"].items():
etree.SubElement(obj, "tag", k=k, v=v)
if "members" in el:
for m in el["members"]:
etree.SubElement(
obj,
'member',
ref=str(m['ref']),
type=m['type'],
role=m.get('role', ''),
"member",
ref=str(m["ref"]),
type=m["type"],
role=m.get("role", ""),
)
if 'nodes' in el:
for n in el['nodes']:
etree.SubElement(obj, 'nd', ref=str(n))
if "nodes" in el:
for n in el["nodes"]:
etree.SubElement(obj, "nd", ref=str(n))
return etree.tostring(root, pretty_print=True)
if __name__ == '__main__':
if __name__ == "__main__":
if len(sys.argv) < 2:
print(
'Read a JSON from Overpass and output JOSM OSM XML with added stop_area relations'
"Read a JSON from Overpass and output JOSM OSM XML with added "
"stop_area relations"
)
print(
'Usage: {} {{<export.json>|<bbox>}} [output.osm]'.format(
"Usage: {} {{<export.json>|<bbox>}} [output.osm]".format(
sys.argv[0]
)
)
sys.exit(1)
if re.match(r'^[-0-9.,]+$', sys.argv[1]):
if re.match(r"^[-0-9.,]+$", sys.argv[1]):
src = overpass_request(sys.argv[1])
else:
with open(sys.argv[1], 'r') as f:
src = json.load(f)['elements']
with open(sys.argv[1], "r") as f:
src = json.load(f)["elements"]
result = add_stop_areas(src)
if len(sys.argv) < 3:
print(result.decode('utf-8'))
print(result.decode("utf-8"))
else:
with open(sys.argv[2], 'wb') as f:
with open(sys.argv[2], "wb") as f:
f.write(result)

View file

@ -1,14 +1,15 @@
#!/usr/bin/env python3
import json
import codecs
from lxml import etree
import sys
import kdtree
import json
import math
import re
import sys
import urllib.parse
import urllib.request
import kdtree
from lxml import etree
QUERY = """
[out:json][timeout:250][bbox:{{bbox}}];
@ -23,17 +24,17 @@ out meta center qt;
def el_id(el):
return el['type'][0] + str(el.get('id', el.get('ref', '')))
return el["type"][0] + str(el.get("id", el.get("ref", "")))
class StationWrapper:
def __init__(self, st):
if 'center' in st:
self.coords = (st['center']['lon'], st['center']['lat'])
elif 'lon' in st:
self.coords = (st['lon'], st['lat'])
if "center" in st:
self.coords = (st["center"]["lon"], st["center"]["lat"])
elif "lon" in st:
self.coords = (st["lon"], st["lat"])
else:
raise Exception('Coordinates not found for station {}'.format(st))
raise Exception("Coordinates not found for station {}".format(st))
self.station = st
def __len__(self):
@ -44,50 +45,50 @@ class StationWrapper:
def distance(self, other):
"""Calculate distance in meters."""
dx = math.radians(self[0] - other['lon']) * math.cos(
0.5 * math.radians(self[1] + other['lat'])
dx = math.radians(self[0] - other["lon"]) * math.cos(
0.5 * math.radians(self[1] + other["lat"])
)
dy = math.radians(self[1] - other['lat'])
dy = math.radians(self[1] - other["lat"])
return 6378137 * math.sqrt(dx * dx + dy * dy)
def overpass_request(bbox):
url = 'http://overpass-api.de/api/interpreter?data={}'.format(
urllib.parse.quote(QUERY.replace('{{bbox}}', bbox))
url = "http://overpass-api.de/api/interpreter?data={}".format(
urllib.parse.quote(QUERY.replace("{{bbox}}", bbox))
)
response = urllib.request.urlopen(url, timeout=1000)
if response.getcode() != 200:
raise Exception(
'Failed to query Overpass API: HTTP {}'.format(response.getcode())
"Failed to query Overpass API: HTTP {}".format(response.getcode())
)
reader = codecs.getreader('utf-8')
return json.load(reader(response))['elements']
reader = codecs.getreader("utf-8")
return json.load(reader(response))["elements"]
def is_part_of_stop(tags):
if tags.get('public_transport') in ('platform', 'stop_position'):
if tags.get("public_transport") in ("platform", "stop_position"):
return True
if tags.get('railway') == 'platform':
if tags.get("railway") == "platform":
return True
return False
def add_stop_areas(src):
if not src:
raise Exception('Empty dataset provided to add_stop_areas')
raise Exception("Empty dataset provided to add_stop_areas")
# Create a kd-tree out of tram stations
stations = kdtree.create(dimensions=2)
for el in src:
if 'tags' in el and el['tags'].get('railway') == 'tram_stop':
if "tags" in el and el["tags"].get("railway") == "tram_stop":
stations.add(StationWrapper(el))
if stations.is_leaf:
raise Exception('No stations found')
raise Exception("No stations found")
elements = {}
for el in src:
if el.get('tags'):
if el.get("tags"):
elements[el_id(el)] = el
# Populate a list of nearby subway exits and platforms for each station
@ -96,27 +97,27 @@ def add_stop_areas(src):
for el in src:
# Only tram routes
if (
'tags' not in el
or el['type'] != 'relation'
or el['tags'].get('route') != 'tram'
"tags" not in el
or el["type"] != "relation"
or el["tags"].get("route") != "tram"
):
continue
for m in el['members']:
for m in el["members"]:
if el_id(m) not in elements:
continue
pel = elements[el_id(m)]
if not is_part_of_stop(pel['tags']):
if not is_part_of_stop(pel["tags"]):
continue
if pel['tags'].get('railway') == 'tram_stop':
if pel["tags"].get("railway") == "tram_stop":
continue
coords = pel.get('center', pel)
station = stations.search_nn(
(coords['lon'], coords['lat'])
)[0].data
coords = pel.get("center", pel)
station = stations.search_nn((coords["lon"], coords["lat"]))[
0
].data
if station.distance(coords) < MAX_DISTANCE:
k = (
station.station['id'],
station.station['tags'].get('name', None),
station.station["id"],
station.station["tags"].get("name", None),
)
if k not in stop_areas:
stop_areas[k] = {el_id(station.station): station.station}
@ -125,11 +126,11 @@ def add_stop_areas(src):
# Find existing stop_area relations for stations and remove these stations
for el in src:
if (
el['type'] == 'relation'
and el['tags'].get('public_transport', None) == 'stop_area'
el["type"] == "relation"
and el["tags"].get("public_transport", None) == "stop_area"
):
found = False
for m in el['members']:
for m in el["members"]:
if found:
break
for st in stop_areas:
@ -139,81 +140,81 @@ def add_stop_areas(src):
break
# Create OSM XML for new stop_area relations
root = etree.Element('osm', version='0.6')
root = etree.Element("osm", version="0.6")
rid = -1
for st, members in stop_areas.items():
rel = etree.SubElement(root, 'relation', id=str(rid))
rel = etree.SubElement(root, "relation", id=str(rid))
rid -= 1
etree.SubElement(rel, 'tag', k='type', v='public_transport')
etree.SubElement(rel, 'tag', k='public_transport', v='stop_area')
etree.SubElement(rel, "tag", k="type", v="public_transport")
etree.SubElement(rel, "tag", k="public_transport", v="stop_area")
if st[1]:
etree.SubElement(rel, 'tag', k='name', v=st[1])
etree.SubElement(rel, "tag", k="name", v=st[1])
for m in members.values():
etree.SubElement(
rel, 'member', ref=str(m['id']), type=m['type'], role=''
rel, "member", ref=str(m["id"]), type=m["type"], role=""
)
# Add all downloaded elements
for el in src:
obj = etree.SubElement(root, el['type'])
obj = etree.SubElement(root, el["type"])
for a in (
'id',
'type',
'user',
'uid',
'version',
'changeset',
'timestamp',
'lat',
'lon',
"id",
"type",
"user",
"uid",
"version",
"changeset",
"timestamp",
"lat",
"lon",
):
if a in el:
obj.set(a, str(el[a]))
if 'modified' in el:
obj.set('action', 'modify')
if 'tags' in el:
for k, v in el['tags'].items():
etree.SubElement(obj, 'tag', k=k, v=v)
if 'members' in el:
for m in el['members']:
if "modified" in el:
obj.set("action", "modify")
if "tags" in el:
for k, v in el["tags"].items():
etree.SubElement(obj, "tag", k=k, v=v)
if "members" in el:
for m in el["members"]:
etree.SubElement(
obj,
'member',
ref=str(m['ref']),
type=m['type'],
role=m.get('role', ''),
"member",
ref=str(m["ref"]),
type=m["type"],
role=m.get("role", ""),
)
if 'nodes' in el:
for n in el['nodes']:
etree.SubElement(obj, 'nd', ref=str(n))
if "nodes" in el:
for n in el["nodes"]:
etree.SubElement(obj, "nd", ref=str(n))
return etree.tostring(root, pretty_print=True, encoding="utf-8")
if __name__ == '__main__':
if __name__ == "__main__":
if len(sys.argv) < 2:
print(
'Read a JSON from Overpass and output JOSM OSM XML '
'with added stop_area relations'
"Read a JSON from Overpass and output JOSM OSM XML "
"with added stop_area relations"
)
print(
'Usage: {} {{<export.json>|<bbox>}} [output.osm]'.format(
"Usage: {} {{<export.json>|<bbox>}} [output.osm]".format(
sys.argv[0]
)
)
sys.exit(1)
if re.match(r'^[-0-9.,]+$', sys.argv[1]):
bbox = sys.argv[1].split(',')
src = overpass_request(','.join([bbox[i] for i in (1, 0, 3, 2)]))
if re.match(r"^[-0-9.,]+$", sys.argv[1]):
bbox = sys.argv[1].split(",")
src = overpass_request(",".join([bbox[i] for i in (1, 0, 3, 2)]))
else:
with open(sys.argv[1], 'r') as f:
src = json.load(f)['elements']
with open(sys.argv[1], "r") as f:
src = json.load(f)["elements"]
result = add_stop_areas(src)
if len(sys.argv) < 3:
print(result.decode('utf-8'))
print(result.decode("utf-8"))
else:
with open(sys.argv[2], 'wb') as f:
with open(sys.argv[2], "wb") as f:
f.write(result)

View file

@ -0,0 +1,12 @@
Flask==2.2.3
kdtree==0.16
lxml==4.9.2
## The following requirements were added by pip freeze:
click==8.1.3
importlib-metadata==6.0.0
itsdangerous==2.1.2
Jinja2==3.1.2
MarkupSafe==2.1.2
Werkzeug==2.2.3
zipp==3.13.0

View file

@ -1,28 +1,30 @@
#!/usr/bin/env python3
from flask import Flask, request, make_response, render_template
from flask import Flask, make_response, render_template, request
from make_stop_areas import add_stop_areas, overpass_request
app = Flask(__name__)
app.debug = True
@app.route('/')
@app.route("/")
def form():
return render_template('index.html')
return render_template("index.html")
@app.route('/process', methods=['GET'])
@app.route("/process", methods=["GET"])
def convert():
src = overpass_request(request.args.get('bbox'))
src = overpass_request(request.args.get("bbox"))
if not src:
return 'No data from overpass, sorry.'
return "No data from overpass, sorry."
result = add_stop_areas(src)
response = make_response(result)
response.headers['Content-Disposition'] = (
'attachment; filename="stop_areas.osm"'
)
response.headers[
"Content-Disposition"
] = 'attachment; filename="stop_areas.osm"'
return response
if __name__ == '__main__':
if __name__ == "__main__":
app.run()

View file

@ -12,33 +12,33 @@ def load_xml(f):
elements = []
for event, element in etree.iterparse(f):
if element.tag in ('node', 'way', 'relation'):
el = {'type': element.tag, 'id': int(element.get('id'))}
if element.tag == 'node':
for n in ('lat', 'lon'):
if element.tag in ("node", "way", "relation"):
el = {"type": element.tag, "id": int(element.get("id"))}
if element.tag == "node":
for n in ("lat", "lon"):
el[n] = float(element.get(n))
tags = {}
nd = []
members = []
for sub in element:
if sub.tag == 'tag':
tags[sub.get('k')] = sub.get('v')
elif sub.tag == 'nd':
nd.append(int(sub.get('ref')))
elif sub.tag == 'member':
if sub.tag == "tag":
tags[sub.get("k")] = sub.get("v")
elif sub.tag == "nd":
nd.append(int(sub.get("ref")))
elif sub.tag == "member":
members.append(
{
'type': sub.get('type'),
'ref': int(sub.get('ref')),
'role': sub.get('role', ''),
"type": sub.get("type"),
"ref": int(sub.get("ref")),
"role": sub.get("role", ""),
}
)
if tags:
el['tags'] = tags
el["tags"] = tags
if nd:
el['nodes'] = nd
el["nodes"] = nd
if members:
el['members'] = members
el["members"] = members
elements.append(el)
element.clear()
@ -55,7 +55,7 @@ def _get_yaml_compatible_string(scalar):
if string and (
string[0] in _YAML_SPECIAL_CHARACTERS
or any(seq in string for seq in _YAML_SPECIAL_SEQUENCES)
or string.endswith(':')
or string.endswith(":")
):
string = string.replace("'", "''")
string = "'{}'".format(string)
@ -63,25 +63,25 @@ def _get_yaml_compatible_string(scalar):
def dump_yaml(city, f):
def write_yaml(data, f, indent=''):
def write_yaml(data, f, indent=""):
if isinstance(data, (set, list)):
f.write('\n')
f.write("\n")
for i in data:
f.write(indent)
f.write('- ')
write_yaml(i, f, indent + ' ')
f.write("- ")
write_yaml(i, f, indent + " ")
elif isinstance(data, dict):
f.write('\n')
f.write("\n")
for k, v in data.items():
if v is None:
continue
f.write(indent + _get_yaml_compatible_string(k) + ': ')
write_yaml(v, f, indent + ' ')
f.write(indent + _get_yaml_compatible_string(k) + ": ")
write_yaml(v, f, indent + " ")
if isinstance(v, (list, set, dict)):
f.write('\n')
f.write("\n")
else:
f.write(_get_yaml_compatible_string(data))
f.write('\n')
f.write("\n")
INCLUDE_STOP_AREAS = False
stops = set()
@ -91,14 +91,14 @@ def dump_yaml(city, f):
[(sa.transfer or sa.id, sa.name) for sa in route.stop_areas()]
)
rte = {
'type': route.mode,
'ref': route.ref,
'name': route.name,
'colour': route.colour,
'infill': route.infill,
'station_count': len(stations),
'stations': list(stations.values()),
'itineraries': {},
"type": route.mode,
"ref": route.ref,
"name": route.name,
"colour": route.colour,
"infill": route.infill,
"station_count": len(stations),
"stations": list(stations.values()),
"itineraries": {},
}
for variant in route:
if INCLUDE_STOP_AREAS:
@ -107,38 +107,38 @@ def dump_yaml(city, f):
s = st.stoparea
if s.id == s.station.id:
v_stops.append(
'{} ({})'.format(s.station.name, s.station.id)
"{} ({})".format(s.station.name, s.station.id)
)
else:
v_stops.append(
'{} ({}) in {} ({})'.format(
"{} ({}) in {} ({})".format(
s.station.name, s.station.id, s.name, s.id
)
)
else:
v_stops = [
'{} ({})'.format(
"{} ({})".format(
s.stoparea.station.name, s.stoparea.station.id
)
for s in variant
]
rte['itineraries'][variant.id] = v_stops
rte["itineraries"][variant.id] = v_stops
stops.update(v_stops)
routes.append(rte)
transfers = []
for t in city.transfers:
v_stops = ['{} ({})'.format(s.name, s.id) for s in t]
v_stops = ["{} ({})".format(s.name, s.id) for s in t]
transfers.append(sorted(v_stops))
result = {
'stations': sorted(stops),
'transfers': sorted(transfers, key=lambda t: t[0]),
'routes': sorted(routes, key=lambda r: r['ref']),
"stations": sorted(stops),
"transfers": sorted(transfers, key=lambda t: t[0]),
"routes": sorted(routes, key=lambda r: r["ref"]),
}
write_yaml(result, f)
def make_geojson(city, tracks=True):
def make_geojson(city, include_tracks_geometry=True):
transfers = set()
for t in city.transfers:
transfers.update(t)
@ -147,36 +147,25 @@ def make_geojson(city, tracks=True):
stops = set()
for rmaster in city:
for variant in rmaster:
if not tracks:
features.append(
{
'type': 'Feature',
'geometry': {
'type': 'LineString',
'coordinates': [s.stop for s in variant],
},
'properties': {
'ref': variant.ref,
'name': variant.name,
'stroke': variant.colour,
},
}
)
elif variant.tracks:
features.append(
{
'type': 'Feature',
'geometry': {
'type': 'LineString',
'coordinates': variant.tracks,
},
'properties': {
'ref': variant.ref,
'name': variant.name,
'stroke': variant.colour,
},
}
)
tracks = (
variant.get_extended_tracks()
if include_tracks_geometry
else [s.stop for s in variant]
)
features.append(
{
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": tracks,
},
"properties": {
"ref": variant.ref,
"name": variant.name,
"stroke": variant.colour,
},
}
)
for st in variant:
stops.add(st.stop)
stopareas.add(st.stoparea)
@ -184,41 +173,41 @@ def make_geojson(city, tracks=True):
for stop in stops:
features.append(
{
'type': 'Feature',
'geometry': {
'type': 'Point',
'coordinates': stop,
"type": "Feature",
"geometry": {
"type": "Point",
"coordinates": stop,
},
'properties': {
'marker-size': 'small',
'marker-symbol': 'circle',
"properties": {
"marker-size": "small",
"marker-symbol": "circle",
},
}
)
for stoparea in stopareas:
features.append(
{
'type': 'Feature',
'geometry': {
'type': 'Point',
'coordinates': stoparea.center,
"type": "Feature",
"geometry": {
"type": "Point",
"coordinates": stoparea.center,
},
'properties': {
'name': stoparea.name,
'marker-size': 'small',
'marker-color': '#ff2600'
"properties": {
"name": stoparea.name,
"marker-size": "small",
"marker-color": "#ff2600"
if stoparea in transfers
else '#797979',
else "#797979",
},
}
)
return {'type': 'FeatureCollection', 'features': features}
return {"type": "FeatureCollection", "features": features}
def _dumps_route_id(route_id):
"""Argument is a route_id that depends on route colour and ref. Name
can be taken from route_master or can be route's own, we don't take it
into consideration. Some of route attributes can be None. The function makes
"""Argument is a route_id that depends on route colour and ref. Name can
be taken from route_master or can be route's own, we don't take it into
consideration. Some of route attributes can be None. The function makes
route_id json-compatible - dumps it to a string."""
return json.dumps(route_id, ensure_ascii=False)
@ -235,7 +224,7 @@ def read_recovery_data(path):
shuffled stations in routes."""
data = None
try:
with open(path, 'r') as f:
with open(path, "r") as f:
try:
data = json.load(f)
except json.decoder.JSONDecodeError as e:
@ -269,21 +258,21 @@ def write_recovery_data(path, current_data, cities):
itineraries = []
for variant in route:
itin = {
'stations': [],
'name': variant.name,
'from': variant.element['tags'].get('from'),
'to': variant.element['tags'].get('to'),
"stations": [],
"name": variant.name,
"from": variant.element["tags"].get("from"),
"to": variant.element["tags"].get("to"),
}
for stop in variant:
station = stop.stoparea.station
station_name = station.name
if station_name == '?' and station.int_name:
if station_name == "?" and station.int_name:
station_name = station.int_name
itin['stations'].append(
itin["stations"].append(
{
'oms_id': station.id,
'name': station_name,
'center': station.center,
"oms_id": station.id,
"name": station_name,
"center": station.center,
}
)
if itin is not None:
@ -293,7 +282,7 @@ def write_recovery_data(path, current_data, cities):
data = current_data
for city in cities:
if city.is_good():
if city.is_good:
data[city.name] = make_city_recovery_data(city)
try:
@ -304,7 +293,7 @@ def write_recovery_data(path, current_data, cities):
}
for city_name, routes in data.items()
}
with open(path, 'w', encoding='utf-8') as f:
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
except Exception as e:
logging.warning("Cannot write recovery data to '%s': %s", path, str(e))

File diff suppressed because it is too large Load diff

0
tests/__init__.py Normal file
View file

View file

@ -0,0 +1,28 @@
{
"w38836456": {
"lat": 55.73064775,
"lon": 37.446065950000005
},
"w489951237": {
"lat": 55.730760724999996,
"lon": 37.44602055
},
"r7588527": {
"lat": 55.73066371666667,
"lon": 37.44604881666667
},
"r7588528": {
"lat": 55.73075192499999,
"lon": 37.44609837
},
"r7588561": {
"lat": 55.73070782083333,
"lon": 37.44607359333334
},
"r13426423": {
"lat": 55.730760724999996,
"lon": 37.44602055
},
"r100": null,
"r101": null
}

View file

@ -0,0 +1,82 @@
<?xml version='1.0' encoding='UTF-8'?>
<osm version='0.6' generator='JOSM'>
<node id='244036218' visible='true' version='27' lat='55.7306986' lon='37.4460134'>
<tag k='public_transport' v='stop_position' />
<tag k='subway' v='yes' />
</node>
<node id='244038961' visible='true' version='26' lat='55.730801' lon='37.4464724'>
<tag k='railway' v='subway_entrance' />
</node>
<node id='461075776' visible='true' version='5' lat='55.7304682' lon='37.4447392' />
<node id='461075811' visible='true' version='7' lat='55.7308273' lon='37.4473927'>
<tag k='barrier' v='gate' />
</node>
<node id='1191237441' visible='true' version='18' lat='55.7308185' lon='37.4459574'>
<tag k='public_transport' v='stop_position' />
<tag k='subway' v='yes' />
</node>
<node id='4821481210' visible='true' version='3' lat='55.7305372' lon='37.4447339' />
<node id='4821481211' visible='true' version='3' lat='55.7306293' lon='37.4446944' />
<node id='4821481212' visible='true' version='3' lat='55.7308921' lon='37.4473467' />
<node id='4821481213' visible='true' version='3' lat='55.7309843' lon='37.4473072' />
<node id='5176248500' visible='true' version='2' lat='55.7306626' lon='37.4460524'>
<tag k='public_transport' v='stop_position' />
<tag k='subway' v='yes' />
</node>
<node id='5176248502' visible='true' version='8' lat='55.7306808' lon='37.4460281'>
<tag k='name' v='Кунцевская' />
<tag k='railway' v='station' />
<tag k='station' v='subway' />
</node>
<way id='38836456' version='4' visible='true'>
<nd ref='461075776' />
<nd ref='461075811' />
<tag k='railway' v='platform' />
</way>
<way id='489951237' visible='true' version='6'>
<nd ref='4821481210' />
<nd ref='4821481211' />
<nd ref='4821481213' />
<nd ref='4821481212' />
<nd ref='4821481210' />
</way>
<relation id='7588527' visible='true' version='7'>
<member type='node' ref='5176248502' role='' />
<member type='node' ref='5176248500' role='stop' />
<member type='way' ref='38836456' role='' />
<tag k='public_transport' v='stop_area' />
<tag k='type' v='public_transport' />
</relation>
<relation id='7588528' visible='true' version='6'>
<member type='node' ref='5176248502' role='' />
<member type='node' ref='244036218' role='stop' />
<member type='node' ref='1191237441' role='stop' />
<member type='relation' ref='13426423' role='platform' />
<member type='node' ref='244038961' role='' />
<member type='relation' ref='7588561' role='' /> <!-- cyclic ref -->
<tag k='public_transport' v='stop_area' />
<tag k='type' v='public_transport' />
</relation>
<relation id='7588561' visible='true' version='5'>
<member type='relation' ref='7588528' role='' />
<member type='relation' ref='7588527' role='' />
<member type='node' ref='1' role='' /> <!-- incomplete ref -->
<member type='way' ref='1' role='' /> <!-- incomplete ref -->
<member type='relation' ref='1' role='' /> <!-- incomplete ref -->
<tag k='name' v='Кунцевская' />
<tag k='public_transport' v='stop_area_group' />
<tag k='type' v='public_transport' />
</relation>
<relation id='13426423' visible='true' version='4'>
<member type='way' ref='489951237' role='outer' />
<tag k='public_transport' v='platform' />
<tag k='type' v='multipolygon' />
</relation>
<relation id='100' visible='true' version='1'>
<tag k='description' v='emtpy relation' />
</relation>
<relation id='101' visible='true' version='1'>
<member type='node' ref='1' role='' /> <!-- incomplete ref -->
<tag k='description' v='only incomplete members' />
</relation>
</osm>

1491
tests/sample_data.py Normal file

File diff suppressed because it is too large Load diff

153
tests/test_build_tracks.py Normal file
View file

@ -0,0 +1,153 @@
"""
To perform tests manually, run this command from the top directory
of the repository:
> python -m unittest discover tests
or simply
> python -m unittest
"""
import io
import unittest
from subway_structure import City
from subway_io import load_xml
from tests.sample_data import sample_networks
class TestOneRouteTracks(unittest.TestCase):
"""Test tracks extending and truncating on one-route networks"""
CITY_TEMPLATE = {
"id": 1,
"name": "Null Island",
"country": "World",
"continent": "Africa",
"num_stations": None, # Would be taken from the sample network data
"num_lines": 1,
"num_light_lines": 0,
"num_interchanges": 0,
"bbox": "-179, -89, 179, 89",
"networks": "",
}
def assertListAlmostEqual(self, list1, list2, places=10) -> None:
if not (isinstance(list1, list) and isinstance(list2, list)):
raise RuntimeError(
f"Not lists passed to the '{self.__class__.__name__}."
"assertListAlmostEqual' method"
)
self.assertEqual(len(list1), len(list2))
for a, b in zip(list1, list2):
if isinstance(a, list) and isinstance(b, list):
self.assertListAlmostEqual(a, b, places)
else:
self.assertAlmostEqual(a, b, places)
def prepare_city_routes(self, network) -> tuple:
city_data = self.CITY_TEMPLATE.copy()
city_data["num_stations"] = network["station_count"]
city = City(city_data)
elements = load_xml(io.BytesIO(network["xml"].encode("utf-8")))
for el in elements:
city.add(el)
city.extract_routes()
city.validate()
self.assertTrue(city.is_good)
route_master = list(city.routes.values())[0]
variants = route_master.routes
fwd_route = [v for v in variants if v.name == "Forward"][0]
bwd_route = [v for v in variants if v.name == "Backward"][0]
return fwd_route, bwd_route
def _test_tracks_extending_for_network(self, network_data):
fwd_route, bwd_route = self.prepare_city_routes(network_data)
self.assertEqual(
fwd_route.tracks,
network_data["tracks"],
"Wrong tracks",
)
extended_tracks = fwd_route.get_extended_tracks()
self.assertEqual(
extended_tracks,
network_data["extended_tracks"],
"Wrong tracks after extending",
)
self.assertEqual(
bwd_route.tracks,
network_data["tracks"][::-1],
"Wrong backward tracks",
)
extended_tracks = bwd_route.get_extended_tracks()
self.assertEqual(
extended_tracks,
network_data["extended_tracks"][::-1],
"Wrong backward tracks after extending",
)
def _test_tracks_truncating_for_network(self, network_data):
fwd_route, bwd_route = self.prepare_city_routes(network_data)
truncated_tracks = fwd_route.get_truncated_tracks(fwd_route.tracks)
self.assertEqual(
truncated_tracks,
network_data["truncated_tracks"],
"Wrong tracks after truncating",
)
truncated_tracks = bwd_route.get_truncated_tracks(bwd_route.tracks)
self.assertEqual(
truncated_tracks,
network_data["truncated_tracks"][::-1],
"Wrong backward tracks after truncating",
)
def _test_stop_positions_on_rails_for_network(self, network_data):
fwd_route, bwd_route = self.prepare_city_routes(network_data)
for route, route_label in zip(
(fwd_route, bwd_route), ("forward", "backward")
):
route_data = network_data[route_label]
for attr in (
"first_stop_on_rails_index",
"last_stop_on_rails_index",
):
self.assertEqual(
getattr(route, attr),
route_data[attr],
f"Wrong {attr} for {route_label} route",
)
first_ind = route_data["first_stop_on_rails_index"]
last_ind = route_data["last_stop_on_rails_index"]
positions_on_rails = [
rs.positions_on_rails
for rs in route.stops[first_ind : last_ind + 1] # noqa E203
]
self.assertListAlmostEqual(
positions_on_rails, route_data["positions_on_rails"]
)
def test_tracks_extending(self) -> None:
for network_name, network_data in sample_networks.items():
with self.subTest(msg=network_name):
self._test_tracks_extending_for_network(network_data)
def test_tracks_truncating(self) -> None:
for network_name, network_data in sample_networks.items():
with self.subTest(msg=network_name):
self._test_tracks_truncating_for_network(network_data)
def test_stop_position_on_rails(self) -> None:
for network_name, network_data in sample_networks.items():
with self.subTest(msg=network_name):
self._test_stop_positions_on_rails_for_network(network_data)

View file

@ -0,0 +1,55 @@
import json
from pathlib import Path
from unittest import TestCase
from process_subways import calculate_centers
from subway_io import load_xml
class TestCenterCalculation(TestCase):
"""Test center calculation. Test data [should] contain among others
the following edge cases:
- an empty relation. It's element should not obtain "center" key.
- relation as member of relation, the child relation following the parent
in the OSM XML file.
- relation with incomplete members (broken references).
- relations with cyclic references.
"""
ASSETS_PATH = Path(__file__).resolve().parent / "assets"
OSM_DATA = str(ASSETS_PATH / "kuntsevskaya_transfer.osm")
CORRECT_CENTERS = str(ASSETS_PATH / "kuntsevskaya_centers.json")
def test__calculate_centers(self) -> None:
elements = load_xml(self.OSM_DATA)
calculate_centers(elements)
elements_dict = {
f"{'w' if el['type'] == 'way' else 'r'}{el['id']}": el
for el in elements
}
calculated_centers = {
k: el["center"]
for k, el in elements_dict.items()
if "center" in el
}
with open(self.CORRECT_CENTERS) as f:
correct_centers = json.load(f)
self.assertTrue(set(calculated_centers).issubset(correct_centers))
for k, correct_center in correct_centers.items():
if correct_center is None:
self.assertNotIn("center", elements_dict[k])
else:
self.assertIn(k, calculated_centers)
calculated_center = calculated_centers[k]
self.assertAlmostEqual(
calculated_center["lat"], correct_center["lat"], places=10
)
self.assertAlmostEqual(
calculated_center["lon"], correct_center["lon"], places=10
)

View file

@ -0,0 +1,96 @@
from unittest import TestCase
from processors.gtfs import (
dict_to_row,
GTFS_COLUMNS,
)
class TestGTFS(TestCase):
"""Test processors/gtfs.py"""
def test__dict_to_row__Nones_and_absent_keys(self) -> None:
"""Test that absent or None values in a GTFS feature item
are converted by dict_to_row() function to empty strings
in right amount.
"""
if GTFS_COLUMNS["trips"][:3] != ["route_id", "service_id", "trip_id"]:
raise RuntimeError("GTFS column names/order inconsistency")
test_trips = [
{
"description": "Absent keys",
"trip_data": {
"route_id": 1,
"service_id": "a",
"trip_id": "tr_123",
},
},
{
"description": "None or absent keys",
"trip_data": {
"route_id": 1,
"service_id": "a",
"trip_id": "tr_123",
"trip_headsign": None,
"trip_short_name": None,
"route_pattern_id": None,
},
},
{
"description": "None, empty-string or absent keys",
"trip_data": {
"route_id": 1,
"service_id": "a",
"trip_id": "tr_123",
"trip_headsign": "",
"trip_short_name": "",
"route_pattern_id": None,
},
},
]
answer = [1, "a", "tr_123"] + [""] * (len(GTFS_COLUMNS["trips"]) - 3)
for test_trip in test_trips:
with self.subTest(msg=test_trip["description"]):
self.assertListEqual(
dict_to_row(test_trip["trip_data"], "trips"), answer
)
def test__dict_to_row__numeric_values(self) -> None:
"""Test that zero numeric values remain zeros in dict_to_row()
function, and not empty strings or None.
"""
shapes = [
{
"description": "Numeric non-zeroes",
"shape_data": {
"shape_id": 1,
"shape_pt_lat": 55.3242425,
"shape_pt_lon": -179.23242,
"shape_pt_sequence": 133,
"shape_dist_traveled": 1.2345,
},
"answer": [1, 55.3242425, -179.23242, 133, 1.2345],
},
{
"description": "Numeric zeroes and None keys",
"shape_data": {
"shape_id": 0,
"shape_pt_lat": 0.0,
"shape_pt_lon": 0,
"shape_pt_sequence": 0,
"shape_dist_traveled": None,
},
"answer": [0, 0.0, 0, 0, ""],
},
]
for shape in shapes:
with self.subTest(shape["description"]):
self.assertListEqual(
dict_to_row(shape["shape_data"], "shapes"), shape["answer"]
)

160
tests/test_projection.py Normal file
View file

@ -0,0 +1,160 @@
import collections
import itertools
import unittest
from subway_structure import project_on_segment
class TestProjection(unittest.TestCase):
"""Test subway_structure.project_on_segment function"""
PRECISION = 10 # decimal places in assertAlmostEqual
SHIFT = 1e-6 # Small distance between projected point and segment endpoint
def _test_projection_in_bulk(self, points, segments, answers):
"""Test 'project_on_segment' function for array of points and array
of parallel segments projections on which are equal.
"""
for point, ans in zip(points, answers):
for seg in segments:
for segment, answer in zip(
(seg, seg[::-1]), # What if invert the segment?
(ans, None if ans is None else 1 - ans),
):
u = project_on_segment(point, segment[0], segment[1])
if answer is None:
self.assertIsNone(
u,
f"Project of point {point} onto segment {segment} "
f"should be None, but {u} returned",
)
else:
self.assertAlmostEqual(
u,
answer,
self.PRECISION,
f"Wrong projection of point {point} onto segment "
f"{segment}: {u} returned, {answer} expected",
)
def test_projection_on_horizontal_segments(self):
points = [
(-2, 0),
(-1 - self.SHIFT, 0),
(-1, 0),
(-1 + self.SHIFT, 0),
(-0.5, 0),
(0, 0),
(0.5, 0),
(1 - self.SHIFT, 0),
(1, 0),
(1 + self.SHIFT, 0),
(2, 0),
]
horizontal_segments = [
((-1, -1), (1, -1)),
((-1, 0), (1, 0)),
((-1, 1), (1, 1)),
]
answers = [
None,
None,
0,
self.SHIFT / 2,
0.25,
0.5,
0.75,
1 - self.SHIFT / 2,
1,
None,
None,
]
self._test_projection_in_bulk(points, horizontal_segments, answers)
def test_projection_on_vertical_segments(self):
points = [
(0, -2),
(0, -1 - self.SHIFT),
(0, -1),
(0, -1 + self.SHIFT),
(0, -0.5),
(0, 0),
(0, 0.5),
(0, 1 - self.SHIFT),
(0, 1),
(0, 1 + self.SHIFT),
(0, 2),
]
vertical_segments = [
((-1, -1), (-1, 1)),
((0, -1), (0, 1)),
((1, -1), (1, 1)),
]
answers = [
None,
None,
0,
self.SHIFT / 2,
0.25,
0.5,
0.75,
1 - self.SHIFT / 2,
1,
None,
None,
]
self._test_projection_in_bulk(points, vertical_segments, answers)
def test_projection_on_inclined_segment(self):
points = [
(-2, -2),
(-1, -1),
(-0.5, -0.5),
(0, 0),
(0.5, 0.5),
(1, 1),
(2, 2),
]
segments = [
((-2, 0), (0, 2)),
((-1, -1), (1, 1)),
((0, -2), (2, 0)),
]
answers = [None, 0, 0.25, 0.5, 0.75, 1, None]
self._test_projection_in_bulk(points, segments, answers)
def test_projection_with_different_collections(self):
"""The tested function should accept points as any consecutive
container with index operator.
"""
types = (
tuple,
list,
collections.deque,
)
point = (0, 0.5)
segment_end1 = (0, 0)
segment_end2 = (1, 0)
for p_type, s1_type, s2_type in itertools.product(types, types, types):
p = p_type(point)
s1 = s1_type(segment_end1)
s2 = s2_type(segment_end2)
project_on_segment(p, s1, s2)
def test_projection_on_degenerate_segment(self):
coords = [-1, 0, 1]
points = [(x, y) for x, y in itertools.product(coords, coords)]
segments = [
((0, 0), (0, 0)),
((0, 0), (0, 1e-8)),
]
answers = [None] * len(points)
self._test_projection_in_bulk(points, segments, answers)

View file

@ -1,7 +1,19 @@
validator_osm_wiki_url = (
"https://wiki.openstreetmap.org/wiki/Quality_assurance#subway-preprocessor"
)
github_url = "https://github.com/alexey-zakharenkov/subways"
produced_by = f"""Produced by
<a href="{github_url}">Subway Preprocessor</a> on {{date}}"""
metro_mapping_osm_article = "https://wiki.openstreetmap.org/wiki/Metro_Mapping"
list_of_metro_systems_url = (
"https://en.wikipedia.org/wiki/List_of_metro_systems#List"
)
# These are templates for validation_to_html.py
# Variables should be in curly braces
STYLE = '''
STYLE = """
<style>
body {
font-family: sans-serif;
@ -98,7 +110,7 @@ td > div {
}
.tooltip:hover:before,.tooltip:hover:after {
opacity: 1;
visibility: visible
visibility: visible
}
footer {
background: white;
@ -108,30 +120,31 @@ footer {
position: sticky;
}
</style>
'''
"""
INDEX_HEADER = '''
INDEX_HEADER = f"""
<!doctype html>
<html>
<head>
<title>Subway Validator</title>
<meta charset="utf-8">
(s)
{STYLE}
</head>
<body>
<main>
<h1>Subway Validation Results</h1>
<p><b>{good_cities}</b> of <b>{total_cities}</b> networks validated without errors.
To make a network validate successfully please follow the
<a href="https://wiki.openstreetmap.org/wiki/Metro_Mapping">metro mapping instructions</a>.
Commit your changes to the OSM and then check back to the updated validation results after the next validation cycle, please.
See <a href="https://wiki.openstreetmap.org/wiki/Quality_assurance#subway-preprocessor">the validator instance&#0040;s&#0041; description</a>
for the schedule and capabilities.</p>
<p><b>{{good_cities}}</b> of <b>{{total_cities}}</b> networks validated without
errors. To make a network validate successfully please follow the
<a href="{metro_mapping_osm_article}">metro mapping
instructions</a>. Commit your changes to the OSM and then check back to the
updated validation results after the next validation cycle, please.
See <a href="{validator_osm_wiki_url}">the validator instance&#0040;s&#0041;
description</a> for the schedule and capabilities.</p>
<p><a href="render.html">View networks on a map</a></p>
<table cellspacing="3" cellpadding="2" style="margin-bottom: 1em;">
'''.replace('(s)', STYLE)
"""
INDEX_CONTINENT = '''
INDEX_CONTINENT = """
<tr><td colspan="9">&nbsp;</td></tr>
<tr>
<th>Continent</th>
@ -157,9 +170,9 @@ INDEX_CONTINENT = '''
<td class="color{=notices}">{num_notices}</td>
</tr>
{content}
'''
"""
INDEX_COUNTRY = '''
INDEX_COUNTRY = """
<tr>
<td>&nbsp;</td>
<td class="bold color{=cities}"><a href="{file}">{country}</a></td>
@ -172,56 +185,57 @@ INDEX_COUNTRY = '''
<td class="color{=warnings}">{num_warnings}</td>
<td class="color{=notices}">{num_notices}</td>
</tr>
'''
"""
INDEX_FOOTER = '''
INDEX_FOOTER = f"""
</table>
</main>
<footer>Produced by <a href="https://github.com/alexey-zakharenkov/subways">Subway Preprocessor</a> on {date}.
See <a href="{google}">this spreadsheet</a> for the reference metro statistics and
<a href="https://en.wikipedia.org/wiki/List_of_metro_systems#List">this wiki page</a> for a list
of all metro systems.</footer>
<footer>{produced_by}
from <a href="{{cities_info_url}}">this reference metro statistics</a>. See
<a href="{list_of_metro_systems_url}">
this wiki page</a> for a list of all metro systems.</footer>
</body>
</html>
'''
"""
COUNTRY_HEADER = '''
COUNTRY_HEADER = f"""
<!doctype html>
<html>
<head>
<title>Subway Validator: {country}</title>
<title>Subway Validator: {{country}}</title>
<meta charset="utf-8">
(s)
{STYLE}
</head>
<body>
<main>
<h1>Subway Validation Results for {country}</h1>
<h1>Subway Validation Results for {{country}}</h1>
<p><a href="index.html">Return to the countries list</a>.</p>
<table cellspacing="3" cellpadding="2">
<tr>
<th>City</th>
{?subways}
{{?subways}}
<th>Subway Lines</th>
<th>Light Rail Lines</th>
{end}{?overground}
{{end}}{{?overground}}
<th>Tram Lines</th>
<th>Bus Lines</th>
<th>T-Bus Lines</th>
<th>Other Lines</th>
{end}
{{end}}
<th>Stations</th>
<th>Interchanges</th>
<th>Unused Entrances</th>
</tr>
'''.replace('(s)', STYLE)
"""
COUNTRY_CITY = '''
COUNTRY_CITY = """
<tr id="{slug}">
<td class="bold color{good_cities}">
{city}
{?yaml}<a href="{yaml}" class="hlink" title="Download YAML">Y</a>{end}
{?json}<a href="{json}" class="hlink" title="Download GeoJSON">J</a>{end}
{?json}<a href="render.html#{slug}" class="hlink" title="View map" target="_blank">M</a>{end}
{?json}<a href="render.html#{slug}" class="hlink" title="View map"
target="_blank">M</a>{end}
</td>
{?subways}
<td class="color{=subwayl}">sub: {subwayl_found} / {subwayl_expected}</td>
@ -229,36 +243,55 @@ COUNTRY_CITY = '''
{end}{?overground}
<td class="color{=traml}">t: {traml_found} / {traml_expected}</td>
<td class="color{=busl}">b: {busl_found} / {busl_expected}</td>
<td class="color{=trolleybusl}">tb: {trolleybusl_found} / {trolleybusl_expected}</td>
<td class="color{=trolleybusl}">
tb: {trolleybusl_found} / {trolleybusl_expected}
</td>
<td class="color{=otherl}">o: {otherl_found} / {otherl_expected}</td>
{end}
<td class="color{=stations}">st: {stations_found} / {stations_expected}</td>
<td class="color{=transfers}">int: {transfers_found} / {transfers_expected}</td>
<td class="color{=transfers}">
int: {transfers_found} / {transfers_expected}
</td>
<td class="color{=entrances}">ent: {unused_entrances}</td>
</tr>
<tr><td colspan="{?subways}6{end}{?overground}8{end}">
{?errors}
<div class="errors"><div data-text="Network is invalid and not suitable for routing." class="tooltip">🛑 Errors</div>
<div class="errors">
<div
data-text="Network is invalid and not suitable for routing."
class="tooltip">
🛑 Errors
</div>
{errors}
</div>
{end}
{?warnings}
<div class="warnings"><div data-text="Problematic data but it's still possible to build routes." class="tooltip"> Warnings</div>
<div class="warnings">
<div
data-text="Problematic data but it's still possible to build routes."
class="tooltip">
Warnings
</div>
{warnings}
</div>
{end}
{?notices}
<div class="notices"><div data-text="Suspicious condition but not necessarily an error." class="tooltip"> Notices</div>
<div class="notices">
<div
data-text="Suspicious condition but not necessarily an error."
class="tooltip">
Notices
</div>
{notices}
{end}
</div>
</td></tr>
'''
"""
COUNTRY_FOOTER = '''
COUNTRY_FOOTER = f"""
</table>
</main>
<footer>Produced by <a href="https://github.com/alexey-zakharenkov/subways">Subway Preprocessor</a> on {date}.</footer>
<footer>{produced_by}.</footer>
</body>
</html>
'''
"""

View file

@ -1,204 +1,229 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import datetime
import re
import os
import sys
import json
from subway_structure import SPREADSHEET_ID
from v2h_templates import *
import os
import re
from collections import defaultdict
from typing import Any, Optional
from process_subways import DEFAULT_SPREADSHEET_ID
from v2h_templates import (
COUNTRY_CITY,
COUNTRY_FOOTER,
COUNTRY_HEADER,
INDEX_CONTINENT,
INDEX_COUNTRY,
INDEX_FOOTER,
INDEX_HEADER,
)
class CityData:
def __init__(self, city=None):
def __init__(self, city: Optional[str] = None) -> None:
self.city = city is not None
self.data = {
'good_cities': 0,
'total_cities': 1 if city else 0,
'num_errors': 0,
'num_warnings': 0,
'num_notices': 0
"good_cities": 0,
"total_cities": 1 if city else 0,
"num_errors": 0,
"num_warnings": 0,
"num_notices": 0,
}
self.slug = None
if city:
self.slug = city['slug']
self.country = city['country']
self.continent = city['continent']
self.errors = city['errors']
self.warnings = city['warnings']
self.notices = city['notices']
self.slug = city["slug"]
self.country = city["country"]
self.continent = city["continent"]
self.errors = city["errors"]
self.warnings = city["warnings"]
self.notices = city["notices"]
if not self.errors:
self.data['good_cities'] = 1
self.data['num_errors'] = len(self.errors)
self.data['num_warnings'] = len(self.warnings)
self.data['num_notices'] = len(self.notices)
self.data["good_cities"] = 1
self.data["num_errors"] = len(self.errors)
self.data["num_warnings"] = len(self.warnings)
self.data["num_notices"] = len(self.notices)
for k, v in city.items():
if 'found' in k or 'expected' in k or 'unused' in k:
if "found" in k or "expected" in k or "unused" in k:
self.data[k] = v
def not__get__(self, i):
return self.data.get(i)
def not__set__(self, i, value):
self.data[i] = value
def __add__(self, other):
def __add__(self, other: CityData) -> CityData:
d = CityData()
for k in set(self.data.keys()) | set(other.data.keys()):
d.data[k] = self.data.get(k, 0) + other.data.get(k, 0)
return d
def format(self, s):
def test_eq(v1, v2):
return '1' if v1 == v2 else '0'
@staticmethod
def test_eq(v1: Any, v2: Any) -> str:
return "1" if v1 == v2 else "0"
def format(self, s: str) -> str:
for k in self.data:
s = s.replace('{' + k + '}', str(self.data[k]))
s = s.replace('{slug}', self.slug or '')
s = s.replace("{" + k + "}", str(self.data[k]))
s = s.replace("{slug}", self.slug or "")
for k in (
'subwayl',
'lightrl',
'stations',
'transfers',
'busl',
'trolleybusl',
'traml',
'otherl',
"subwayl",
"lightrl",
"stations",
"transfers",
"busl",
"trolleybusl",
"traml",
"otherl",
):
if k + '_expected' in self.data:
if k + "_expected" in self.data:
s = s.replace(
'{=' + k + '}',
test_eq(
self.data[k + '_found'], self.data[k + '_expected']
"{=" + k + "}",
self.test_eq(
self.data[k + "_found"], self.data[k + "_expected"]
),
)
s = s.replace(
'{=cities}',
test_eq(self.data['good_cities'], self.data['total_cities']),
"{=cities}",
self.test_eq(self.data["good_cities"], self.data["total_cities"]),
)
s = s.replace(
'{=entrances}', test_eq(self.data['unused_entrances'], 0)
"{=entrances}", self.test_eq(self.data["unused_entrances"], 0)
)
for k in ('errors', 'warnings', 'notices'):
s = s.replace('{=' + k + '}', test_eq(self.data['num_' + k], 0))
for k in ("errors", "warnings", "notices"):
s = s.replace(
"{=" + k + "}", self.test_eq(self.data["num_" + k], 0)
)
return s
def tmpl(s, data=None, **kwargs):
def tmpl(s: str, data: Optional[CityData] = None, **kwargs) -> str:
if data:
s = data.format(s)
if kwargs:
for k, v in kwargs.items():
if v is not None:
s = s.replace('{' + k + '}', str(v))
s = s.replace("{" + k + "}", str(v))
s = re.sub(
r'\{\?' + k + r'\}(.+?)\{end\}',
r'\1' if v else '',
r"\{\?" + k + r"\}(.+?)\{end\}",
r"\1" if v else "",
s,
flags=re.DOTALL,
)
s = s.replace('{date}', date)
google_url = (
'https://docs.google.com/spreadsheets/d/{}/edit?usp=sharing'.format(
SPREADSHEET_ID
)
)
s = s.replace('{google}', google_url)
return s
EXPAND_OSM_TYPE = {'n': 'node', 'w': 'way', 'r': 'relation'}
RE_SHORT = re.compile(r'\b([nwr])(\d+)\b')
RE_FULL = re.compile(r'\b(node|way|relation) (\d+)\b')
RE_COORDS = re.compile(r'\((-?\d+\.\d+), (-?\d+\.\d+)\)')
EXPAND_OSM_TYPE = {"n": "node", "w": "way", "r": "relation"}
RE_SHORT = re.compile(r"\b([nwr])(\d+)\b")
RE_FULL = re.compile(r"\b(node|way|relation) (\d+)\b")
RE_COORDS = re.compile(r"\((-?\d+\.\d+), (-?\d+\.\d+)\)")
def osm_links(s):
def osm_links(s: str) -> str:
"""Converts object mentions to HTML links."""
def link(m):
return '<a href="https://www.openstreetmap.org/{}/{}">{}</a>'.format(
EXPAND_OSM_TYPE[m.group(1)[0]], m.group(2), m.group(0)
def link(m: re.Match) -> str:
osm_type = EXPAND_OSM_TYPE[m.group(1)[0]]
osm_id = m.group(2)
return (
'<a href="https://www.openstreetmap.org/'
f'{osm_type}/{osm_id}">{m.group(0)}</a>'
)
s = RE_SHORT.sub(link, s)
s = RE_FULL.sub(link, s)
s = RE_COORDS.sub(
r'(<a href="https://www.openstreetmap.org/search?query=\2%2C\1#map=18/\2/\1">pos</a>)',
r'(<a href="https://www.openstreetmap.org/search?'
r'query=\2%2C\1#map=18/\2/\1">pos</a>)',
s,
)
return s
def esc(s):
return s.replace('&', '&amp;').replace('<', '&lt;').replace('>', '&gt;')
def esc(s: str) -> str:
return s.replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;")
if len(sys.argv) < 2:
print('Reads a log from subway validator and prepares HTML files.')
print(
'Usage: {} <validation.log> [<target_directory>]'.format(sys.argv[0])
def br_osm_links(elems: list) -> str:
return "<br>".join(osm_links(esc(elem)) for elem in elems)
def main() -> None:
parser = argparse.ArgumentParser(
description=(
"Reads a log from subway validator and prepares HTML files."
)
)
sys.exit(1)
parser.add_argument("validation_log")
parser.add_argument("target_directory", nargs="?", default=".")
parser.add_argument(
"--cities-info-url",
default=(
"https://docs.google.com/spreadsheets/d/"
f"{DEFAULT_SPREADSHEET_ID}/edit?usp=sharing"
),
)
options = parser.parse_args()
target_dir = options.target_directory
cities_info_url = options.cities_info_url
with open(sys.argv[1], 'r', encoding='utf-8') as f:
data = {c['name']: CityData(c) for c in json.load(f)}
with open(options.validation_log, encoding="utf-8") as f:
data = {c["name"]: CityData(c) for c in json.load(f)}
countries = {}
continents = {}
c_by_c = {} # continent → set of countries
for c in data.values():
countries[c.country] = c + countries.get(c.country, CityData())
continents[c.continent] = c + continents.get(c.continent, CityData())
if c.continent not in c_by_c:
c_by_c[c.continent] = set()
c_by_c[c.continent].add(c.country)
world = sum(continents.values(), CityData())
countries = {}
continents = {}
c_by_c = defaultdict(set) # continent → set of countries
for c in data.values():
countries[c.country] = c + countries.get(c.country, CityData())
continents[c.continent] = c + continents.get(c.continent, CityData())
c_by_c[c.continent].add(c.country)
world = sum(continents.values(), CityData())
overground = 'traml_expected' in next(iter(data.values())).data
date = datetime.datetime.utcnow().strftime('%d.%m.%Y %H:%M UTC')
path = '.' if len(sys.argv) < 3 else sys.argv[2]
index = open(os.path.join(path, 'index.html'), 'w', encoding='utf-8')
index.write(tmpl(INDEX_HEADER, world))
overground = "traml_expected" in next(iter(data.values())).data
date = datetime.datetime.utcnow().strftime("%d.%m.%Y %H:%M UTC")
index = open(os.path.join(target_dir, "index.html"), "w", encoding="utf-8")
index.write(tmpl(INDEX_HEADER, world))
for continent in sorted(continents.keys()):
content = ''
for country in sorted(c_by_c[continent]):
country_file_name = country.lower().replace(' ', '-') + '.html'
content += tmpl(
INDEX_COUNTRY,
countries[country],
file=country_file_name,
country=country,
continent=continent,
)
country_file = open(
os.path.join(path, country_file_name), 'w', encoding='utf-8'
)
country_file.write(
tmpl(
COUNTRY_HEADER,
for continent in sorted(continents.keys()):
content = ""
for country in sorted(c_by_c[continent]):
country_file_name = country.lower().replace(" ", "-") + ".html"
content += tmpl(
INDEX_COUNTRY,
countries[country],
file=country_file_name,
country=country,
continent=continent,
overground=overground,
subways=not overground,
)
)
for name, city in sorted(data.items()):
if city.country == country:
file_base = os.path.join(path, city.slug)
country_file = open(
os.path.join(target_dir, country_file_name),
"w",
encoding="utf-8",
)
country_file.write(
tmpl(
COUNTRY_HEADER,
country=country,
continent=continent,
overground=overground,
subways=not overground,
)
)
for name, city in sorted(
(name, city)
for name, city in data.items()
if city.country == country
):
file_base = os.path.join(target_dir, city.slug)
yaml_file = (
city.slug + '.yaml'
if os.path.exists(file_base + '.yaml')
city.slug + ".yaml"
if os.path.exists(file_base + ".yaml")
else None
)
json_file = (
city.slug + '.geojson'
if os.path.exists(file_base + '.geojson')
city.slug + ".geojson"
if os.path.exists(file_base + ".geojson")
else None
)
errors = '<br>'.join([osm_links(esc(e)) for e in city.errors])
warnings = '<br>'.join([osm_links(esc(w)) for w in city.warnings])
notices = '<br>'.join([osm_links(esc(n)) for n in city.notices])
errors = br_osm_links(city.errors)
warnings = br_osm_links(city.warnings)
notices = br_osm_links(city.notices)
country_file.write(
tmpl(
COUNTRY_CITY,
@ -215,18 +240,27 @@ for continent in sorted(continents.keys()):
overground=overground,
)
)
country_file.write(
tmpl(COUNTRY_FOOTER, country=country, continent=continent)
country_file.write(
tmpl(
COUNTRY_FOOTER,
country=country,
continent=continent,
date=date,
)
)
country_file.close()
index.write(
tmpl(
INDEX_CONTINENT,
continents[continent],
content=content,
continent=continent,
)
)
country_file.close()
index.write(
tmpl(
INDEX_CONTINENT,
continents[continent],
content=content,
continent=continent,
)
)
index.write(tmpl(INDEX_FOOTER))
index.close()
index.write(tmpl(INDEX_FOOTER, date=date, cities_info_url=cities_info_url))
index.close()
if __name__ == "__main__":
main()