Run the code through flake8 and black

This commit is contained in:
Alexey Zakharenkov 2022-12-17 23:39:48 +03:00 committed by Alexey Zakharenkov
parent 86e65d2115
commit 95d0d0d0fd
22 changed files with 1377 additions and 1273 deletions

View file

@ -26,14 +26,14 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install flake8
pip install flake8 black
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
- name: Lint with flake8
run: |
# stop the build if there are Python syntax errors or undefined names
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
# exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics
flake8
- name: Check with black
run: |
black --check --line-length 79 .
- name: Test with unittest
run: |
python -m unittest discover tests

View file

@ -1,6 +1,6 @@
import functools
import logging
import math
import functools
"""A coordinate of a station precision of which we must take into account
@ -16,42 +16,48 @@ def coords_eq(lon1, lat1, lon2, lat2):
def osm_id_comparator(el):
"""This function is used as key for sorting lists of
OSM-originated objects
OSM-originated objects
"""
return (el['osm_type'], el['osm_id'])
return (el["osm_type"], el["osm_id"])
def itinerary_comparator(itinerary):
"This function is used as key for sorting itineraries in a route"""
return (itinerary['stops'], itinerary['interval'])
"""This function is used as key for sorting itineraries in a route"""
return (itinerary["stops"], itinerary["interval"])
def compare_stops(stop0, stop1):
"""Compares json of two stops in route"""
stop_keys = ('name', 'int_name', 'id', 'osm_id', 'osm_type')
stop_keys = ("name", "int_name", "id", "osm_id", "osm_type")
stop0_props = tuple(stop0[k] for k in stop_keys)
stop1_props = tuple(stop1[k] for k in stop_keys)
if stop0_props != stop1_props:
logging.debug("Different stops properties: %s, %s",
stop0_props, stop1_props)
logging.debug(
"Different stops properties: %s, %s", stop0_props, stop1_props
)
return False
if not coords_eq(stop0['lon'], stop0['lat'],
stop1['lon'], stop1['lat']):
logging.debug("Different stops coordinates: %s (%f, %f), %s (%f, %f)",
stop0_props, stop0['lon'], stop0['lat'],
stop1_props, stop1['lon'], stop1['lat'])
if not coords_eq(stop0["lon"], stop0["lat"], stop1["lon"], stop1["lat"]):
logging.debug(
"Different stops coordinates: %s (%f, %f), %s (%f, %f)",
stop0_props,
stop0["lon"],
stop0["lat"],
stop1_props,
stop1["lon"],
stop1["lat"],
)
return False
entrances0 = sorted(stop0['entrances'], key=osm_id_comparator)
entrances1 = sorted(stop1['entrances'], key=osm_id_comparator)
entrances0 = sorted(stop0["entrances"], key=osm_id_comparator)
entrances1 = sorted(stop1["entrances"], key=osm_id_comparator)
if entrances0 != entrances1:
logging.debug("Different stop entrances")
return False
exits0 = sorted(stop0['exits'], key=osm_id_comparator)
exits1 = sorted(stop1['exits'], key=osm_id_comparator)
exits0 = sorted(stop0["exits"], key=osm_id_comparator)
exits1 = sorted(stop1["exits"], key=osm_id_comparator)
if exits0 != exits1:
logging.debug("Different stop exits")
return False
@ -61,21 +67,24 @@ def compare_stops(stop0, stop1):
def compare_transfers(transfers0, transfers1):
"""Compares two arrays of transfers of the form
[(stop1_uid, stop2_uid, time), ...]
[(stop1_uid, stop2_uid, time), ...]
"""
if len(transfers0) != len(transfers1):
logging.debug("Different len(transfers): %d != %d",
len(transfers0), len(transfers1))
logging.debug(
"Different len(transfers): %d != %d",
len(transfers0),
len(transfers1),
)
return False
transfers0 = [tuple([t[0], t[1], t[2]])
if t[0] < t[1] else
tuple([t[1], t[0], t[2]])
for t in transfers0]
transfers1 = [tuple([t[0], t[1], t[2]])
if t[0] < t[1] else
tuple([t[1], t[0], t[2]])
for t in transfers1]
transfers0 = [
tuple([t[0], t[1], t[2]]) if t[0] < t[1] else tuple([t[1], t[0], t[2]])
for t in transfers0
]
transfers1 = [
tuple([t[0], t[1], t[2]]) if t[0] < t[1] else tuple([t[1], t[0], t[2]])
for t in transfers1
]
transfers0.sort()
transfers1.sort()
@ -84,8 +93,9 @@ def compare_transfers(transfers0, transfers1):
for tr0, tr1 in zip(transfers0, transfers1):
if tr0 != tr1:
if diff_cnt == 0:
logging.debug("First pair of different transfers: %s, %s",
tr0, tr1)
logging.debug(
"First pair of different transfers: %s, %s", tr0, tr1
)
diff_cnt += 1
if diff_cnt:
logging.debug("Different transfers number = %d", diff_cnt)
@ -95,46 +105,55 @@ def compare_transfers(transfers0, transfers1):
def compare_networks(network0, network1):
if network0['agency_id'] != network1['agency_id']:
logging.debug("Different agency_id at route '%s'",
network0['network'])
if network0["agency_id"] != network1["agency_id"]:
logging.debug("Different agency_id at route '%s'", network0["network"])
return False
route_ids0 = sorted(x['route_id'] for x in network0['routes'])
route_ids1 = sorted(x['route_id'] for x in network1['routes'])
route_ids0 = sorted(x["route_id"] for x in network0["routes"])
route_ids1 = sorted(x["route_id"] for x in network1["routes"])
if route_ids0 != route_ids1:
logging.debug("Different route_ids: %s != %s",
route_ids0, route_ids1)
logging.debug("Different route_ids: %s != %s", route_ids0, route_ids1)
return False
routes0 = sorted(network0['routes'], key=lambda x: x['route_id'])
routes1 = sorted(network1['routes'], key=lambda x: x['route_id'])
routes0 = sorted(network0["routes"], key=lambda x: x["route_id"])
routes1 = sorted(network1["routes"], key=lambda x: x["route_id"])
# Keys to compare routes. 'name' key is omitted since RouteMaster
# can get its name from one of its Routes unpredictably.
route_keys = ('type', 'ref', 'colour', 'route_id')
route_keys = ("type", "ref", "colour", "route_id")
for route0, route1 in zip(routes0, routes1):
route0_props = tuple(route0[k] for k in route_keys)
route1_props = tuple(route1[k] for k in route_keys)
if route0_props != route1_props:
logging.debug("Route props of '%s' are different: %s, %s",
route0['route_id'], route0_props, route1_props)
logging.debug(
"Route props of '%s' are different: %s, %s",
route0["route_id"],
route0_props,
route1_props,
)
return False
itineraries0 = sorted(route0['itineraries'], key=itinerary_comparator)
itineraries1 = sorted(route1['itineraries'], key=itinerary_comparator)
itineraries0 = sorted(route0["itineraries"], key=itinerary_comparator)
itineraries1 = sorted(route1["itineraries"], key=itinerary_comparator)
for itin0, itin1 in zip(itineraries0, itineraries1):
if itin0['interval'] != itin1['interval']:
logging.debug("Different interval: %d != %d at route %s '%s'",
itin0['interval'], itin1['interval'],
route0['route_id'], route0['name'])
if itin0["interval"] != itin1["interval"]:
logging.debug(
"Different interval: %d != %d at route %s '%s'",
itin0["interval"],
itin1["interval"],
route0["route_id"],
route0["name"],
)
return False
if itin0['stops'] != itin1['stops']:
logging.debug("Different stops at route %s '%s'",
route0['route_id'], route0['name'])
if itin0["stops"] != itin1["stops"]:
logging.debug(
"Different stops at route %s '%s'",
route0["route_id"],
route0["name"],
)
return False
return True

View file

@ -10,10 +10,11 @@
affect the process_subways.py output really doesn't change it.
"""
import sys
import json
import logging
from common import compare_stops, compare_transfers, compare_networks
import sys
from common import compare_networks, compare_stops, compare_transfers
def compare_jsons(cache0, cache1):
@ -28,21 +29,21 @@ def compare_jsons(cache0, cache1):
for name in city_names0:
city0 = cache0[name]
city1 = cache1[name]
if not compare_networks(city0['network'], city1['network']):
if not compare_networks(city0["network"], city1["network"]):
return False
stop_ids0 = sorted(city0['stops'].keys())
stop_ids1 = sorted(city1['stops'].keys())
stop_ids0 = sorted(city0["stops"].keys())
stop_ids1 = sorted(city1["stops"].keys())
if stop_ids0 != stop_ids1:
logging.debug("Different stop_ids")
return False
stops0 = [v for k, v in sorted(city0['stops'].items())]
stops1 = [v for k, v in sorted(city1['stops'].items())]
stops0 = [v for k, v in sorted(city0["stops"].items())]
stops1 = [v for k, v in sorted(city1["stops"].items())]
for stop0, stop1 in zip(stops0, stops1):
if not compare_stops(stop0, stop1):
return False
if not compare_transfers(city0['transfers'], city1['transfers']):
if not compare_transfers(city0["transfers"], city1["transfers"]):
return False
return True
@ -57,8 +58,8 @@ if __name__ == "__main__":
path0, path1 = sys.argv[1:3]
j0 = json.load(open(path0, encoding='utf-8'))
j1 = json.load(open(path1, encoding='utf-8'))
j0 = json.load(open(path0, encoding="utf-8"))
j1 = json.load(open(path1, encoding="utf-8"))
equal = compare_jsons(j0, j1)

View file

@ -10,38 +10,39 @@
affect the process_subways.py output really doesn't change it.
"""
import sys
import json
import logging
from common import compare_stops, compare_transfers, compare_networks
import sys
from common import compare_networks, compare_stops, compare_transfers
def compare_jsons(result0, result1):
"""Compares two objects which are results of subway generation"""
network_names0 = sorted([x['network'] for x in result0['networks']])
network_names1 = sorted([x['network'] for x in result1['networks']])
network_names0 = sorted([x["network"] for x in result0["networks"]])
network_names1 = sorted([x["network"] for x in result1["networks"]])
if network_names0 != network_names1:
logging.debug("Different list of network names!")
return False
networks0 = sorted(result0['networks'], key=lambda x: x['network'])
networks1 = sorted(result1['networks'], key=lambda x: x['network'])
networks0 = sorted(result0["networks"], key=lambda x: x["network"])
networks1 = sorted(result1["networks"], key=lambda x: x["network"])
for network0, network1 in zip(networks0, networks1):
if not compare_networks(network0, network1):
return False
stop_ids0 = sorted(x['id'] for x in result0['stops'])
stop_ids1 = sorted(x['id'] for x in result1['stops'])
stop_ids0 = sorted(x["id"] for x in result0["stops"])
stop_ids1 = sorted(x["id"] for x in result1["stops"])
if stop_ids0 != stop_ids1:
logging.debug("Different stop_ids")
return False
stops0 = sorted(result0['stops'], key=lambda x: x['id'])
stops1 = sorted(result1['stops'], key=lambda x: x['id'])
stops0 = sorted(result0["stops"], key=lambda x: x["id"])
stops1 = sorted(result1["stops"], key=lambda x: x["id"])
for stop0, stop1 in zip(stops0, stops1):
if not compare_stops(stop0, stop1):
return False
if not compare_transfers(result0['transfers'], result1['transfers']):
if not compare_transfers(result0["transfers"], result1["transfers"]):
return False
return True
@ -56,8 +57,8 @@ if __name__ == "__main__":
path0, path1 = sys.argv[1:3]
j0 = json.load(open(path0, encoding='utf-8'))
j1 = json.load(open(path1, encoding='utf-8'))
j0 = json.load(open(path0, encoding="utf-8"))
j1 = json.load(open(path1, encoding="utf-8"))
equal = compare_jsons(j0, j1)

View file

@ -2,153 +2,153 @@ import re
# Source: https://www.w3.org/TR/css3-color/#svg-color
CSS_COLOURS = {
'aliceblue': '#f0f8ff',
'antiquewhite': '#faebd7',
'aqua': '#00ffff',
'aquamarine': '#7fffd4',
'azure': '#f0ffff',
'beige': '#f5f5dc',
'bisque': '#ffe4c4',
'black': '#000000',
'blanchedalmond': '#ffebcd',
'blue': '#0000ff',
'blueviolet': '#8a2be2',
'brown': '#a52a2a',
'burlywood': '#deb887',
'cadetblue': '#5f9ea0',
'chartreuse': '#7fff00',
'chocolate': '#d2691e',
'coral': '#ff7f50',
'cornflowerblue': '#6495ed',
'cornsilk': '#fff8dc',
'crimson': '#dc143c',
'cyan': '#00ffff',
'darkblue': '#00008b',
'darkcyan': '#008b8b',
'darkgoldenrod': '#b8860b',
'darkgray': '#a9a9a9',
'darkgreen': '#006400',
'darkgrey': '#a9a9a9',
'darkkhaki': '#bdb76b',
'darkmagenta': '#8b008b',
'darkolivegreen': '#556b2f',
'darkorange': '#ff8c00',
'darkorchid': '#9932cc',
'darkred': '#8b0000',
'darksalmon': '#e9967a',
'darkseagreen': '#8fbc8f',
'darkslateblue': '#483d8b',
'darkslategray': '#2f4f4f',
'darkslategrey': '#2f4f4f',
'darkturquoise': '#00ced1',
'darkviolet': '#9400d3',
'deeppink': '#ff1493',
'deepskyblue': '#00bfff',
'dimgray': '#696969',
'dimgrey': '#696969',
'dodgerblue': '#1e90ff',
'firebrick': '#b22222',
'floralwhite': '#fffaf0',
'forestgreen': '#228b22',
'fuchsia': '#ff00ff',
'gainsboro': '#dcdcdc',
'ghostwhite': '#f8f8ff',
'gold': '#ffd700',
'goldenrod': '#daa520',
'gray': '#808080',
'green': '#008000',
'greenyellow': '#adff2f',
'grey': '#808080',
'honeydew': '#f0fff0',
'hotpink': '#ff69b4',
'indianred': '#cd5c5c',
'indigo': '#4b0082',
'ivory': '#fffff0',
'khaki': '#f0e68c',
'lavender': '#e6e6fa',
'lavenderblush': '#fff0f5',
'lawngreen': '#7cfc00',
'lemonchiffon': '#fffacd',
'lightblue': '#add8e6',
'lightcoral': '#f08080',
'lightcyan': '#e0ffff',
'lightgoldenrodyellow': '#fafad2',
'lightgray': '#d3d3d3',
'lightgreen': '#90ee90',
'lightgrey': '#d3d3d3',
'lightpink': '#ffb6c1',
'lightsalmon': '#ffa07a',
'lightseagreen': '#20b2aa',
'lightskyblue': '#87cefa',
'lightslategray': '#778899',
'lightslategrey': '#778899',
'lightsteelblue': '#b0c4de',
'lightyellow': '#ffffe0',
'lime': '#00ff00',
'limegreen': '#32cd32',
'linen': '#faf0e6',
'magenta': '#ff00ff',
'maroon': '#800000',
'mediumaquamarine': '#66cdaa',
'mediumblue': '#0000cd',
'mediumorchid': '#ba55d3',
'mediumpurple': '#9370db',
'mediumseagreen': '#3cb371',
'mediumslateblue': '#7b68ee',
'mediumspringgreen': '#00fa9a',
'mediumturquoise': '#48d1cc',
'mediumvioletred': '#c71585',
'midnightblue': '#191970',
'mintcream': '#f5fffa',
'mistyrose': '#ffe4e1',
'moccasin': '#ffe4b5',
'navajowhite': '#ffdead',
'navy': '#000080',
'oldlace': '#fdf5e6',
'olive': '#808000',
'olivedrab': '#6b8e23',
'orange': '#ffa500',
'orangered': '#ff4500',
'orchid': '#da70d6',
'palegoldenrod': '#eee8aa',
'palegreen': '#98fb98',
'paleturquoise': '#afeeee',
'palevioletred': '#db7093',
'papayawhip': '#ffefd5',
'peachpuff': '#ffdab9',
'peru': '#cd853f',
'pink': '#ffc0cb',
'plum': '#dda0dd',
'powderblue': '#b0e0e6',
'purple': '#800080',
'red': '#ff0000',
'rosybrown': '#bc8f8f',
'royalblue': '#4169e1',
'saddlebrown': '#8b4513',
'salmon': '#fa8072',
'sandybrown': '#f4a460',
'seagreen': '#2e8b57',
'seashell': '#fff5ee',
'sienna': '#a0522d',
'silver': '#c0c0c0',
'skyblue': '#87ceeb',
'slateblue': '#6a5acd',
'slategray': '#708090',
'slategrey': '#708090',
'snow': '#fffafa',
'springgreen': '#00ff7f',
'steelblue': '#4682b4',
'tan': '#d2b48c',
'teal': '#008080',
'thistle': '#d8bfd8',
'tomato': '#ff6347',
'turquoise': '#40e0d0',
'violet': '#ee82ee',
'wheat': '#f5deb3',
'white': '#ffffff',
'whitesmoke': '#f5f5f5',
'yellow': '#ffff00',
'yellowgreen': '#9acd32',
"aliceblue": "#f0f8ff",
"antiquewhite": "#faebd7",
"aqua": "#00ffff",
"aquamarine": "#7fffd4",
"azure": "#f0ffff",
"beige": "#f5f5dc",
"bisque": "#ffe4c4",
"black": "#000000",
"blanchedalmond": "#ffebcd",
"blue": "#0000ff",
"blueviolet": "#8a2be2",
"brown": "#a52a2a",
"burlywood": "#deb887",
"cadetblue": "#5f9ea0",
"chartreuse": "#7fff00",
"chocolate": "#d2691e",
"coral": "#ff7f50",
"cornflowerblue": "#6495ed",
"cornsilk": "#fff8dc",
"crimson": "#dc143c",
"cyan": "#00ffff",
"darkblue": "#00008b",
"darkcyan": "#008b8b",
"darkgoldenrod": "#b8860b",
"darkgray": "#a9a9a9",
"darkgreen": "#006400",
"darkgrey": "#a9a9a9",
"darkkhaki": "#bdb76b",
"darkmagenta": "#8b008b",
"darkolivegreen": "#556b2f",
"darkorange": "#ff8c00",
"darkorchid": "#9932cc",
"darkred": "#8b0000",
"darksalmon": "#e9967a",
"darkseagreen": "#8fbc8f",
"darkslateblue": "#483d8b",
"darkslategray": "#2f4f4f",
"darkslategrey": "#2f4f4f",
"darkturquoise": "#00ced1",
"darkviolet": "#9400d3",
"deeppink": "#ff1493",
"deepskyblue": "#00bfff",
"dimgray": "#696969",
"dimgrey": "#696969",
"dodgerblue": "#1e90ff",
"firebrick": "#b22222",
"floralwhite": "#fffaf0",
"forestgreen": "#228b22",
"fuchsia": "#ff00ff",
"gainsboro": "#dcdcdc",
"ghostwhite": "#f8f8ff",
"gold": "#ffd700",
"goldenrod": "#daa520",
"gray": "#808080",
"green": "#008000",
"greenyellow": "#adff2f",
"grey": "#808080",
"honeydew": "#f0fff0",
"hotpink": "#ff69b4",
"indianred": "#cd5c5c",
"indigo": "#4b0082",
"ivory": "#fffff0",
"khaki": "#f0e68c",
"lavender": "#e6e6fa",
"lavenderblush": "#fff0f5",
"lawngreen": "#7cfc00",
"lemonchiffon": "#fffacd",
"lightblue": "#add8e6",
"lightcoral": "#f08080",
"lightcyan": "#e0ffff",
"lightgoldenrodyellow": "#fafad2",
"lightgray": "#d3d3d3",
"lightgreen": "#90ee90",
"lightgrey": "#d3d3d3",
"lightpink": "#ffb6c1",
"lightsalmon": "#ffa07a",
"lightseagreen": "#20b2aa",
"lightskyblue": "#87cefa",
"lightslategray": "#778899",
"lightslategrey": "#778899",
"lightsteelblue": "#b0c4de",
"lightyellow": "#ffffe0",
"lime": "#00ff00",
"limegreen": "#32cd32",
"linen": "#faf0e6",
"magenta": "#ff00ff",
"maroon": "#800000",
"mediumaquamarine": "#66cdaa",
"mediumblue": "#0000cd",
"mediumorchid": "#ba55d3",
"mediumpurple": "#9370db",
"mediumseagreen": "#3cb371",
"mediumslateblue": "#7b68ee",
"mediumspringgreen": "#00fa9a",
"mediumturquoise": "#48d1cc",
"mediumvioletred": "#c71585",
"midnightblue": "#191970",
"mintcream": "#f5fffa",
"mistyrose": "#ffe4e1",
"moccasin": "#ffe4b5",
"navajowhite": "#ffdead",
"navy": "#000080",
"oldlace": "#fdf5e6",
"olive": "#808000",
"olivedrab": "#6b8e23",
"orange": "#ffa500",
"orangered": "#ff4500",
"orchid": "#da70d6",
"palegoldenrod": "#eee8aa",
"palegreen": "#98fb98",
"paleturquoise": "#afeeee",
"palevioletred": "#db7093",
"papayawhip": "#ffefd5",
"peachpuff": "#ffdab9",
"peru": "#cd853f",
"pink": "#ffc0cb",
"plum": "#dda0dd",
"powderblue": "#b0e0e6",
"purple": "#800080",
"red": "#ff0000",
"rosybrown": "#bc8f8f",
"royalblue": "#4169e1",
"saddlebrown": "#8b4513",
"salmon": "#fa8072",
"sandybrown": "#f4a460",
"seagreen": "#2e8b57",
"seashell": "#fff5ee",
"sienna": "#a0522d",
"silver": "#c0c0c0",
"skyblue": "#87ceeb",
"slateblue": "#6a5acd",
"slategray": "#708090",
"slategrey": "#708090",
"snow": "#fffafa",
"springgreen": "#00ff7f",
"steelblue": "#4682b4",
"tan": "#d2b48c",
"teal": "#008080",
"thistle": "#d8bfd8",
"tomato": "#ff6347",
"turquoise": "#40e0d0",
"violet": "#ee82ee",
"wheat": "#f5deb3",
"white": "#ffffff",
"whitesmoke": "#f5f5f5",
"yellow": "#ffff00",
"yellowgreen": "#9acd32",
}
@ -158,8 +158,8 @@ def normalize_colour(c):
c = c.strip().lower()
if c in CSS_COLOURS:
return CSS_COLOURS[c]
if re.match(r'^#?[0-9a-f]{3}([0-9a-f]{3})?$', c):
if re.match(r"^#?[0-9a-f]{3}([0-9a-f]{3})?$", c):
if len(c) == 4:
return c[0]+c[1]+c[1]+c[2]+c[2]+c[3]+c[3]
return c[0] + c[1] + c[1] + c[2] + c[2] + c[3] + c[3]
return c
raise ValueError('Unknown colour code: {}'.format(c))
raise ValueError("Unknown colour code: {}".format(c))

View file

@ -31,5 +31,5 @@ def make_disjoint_metro_polygons():
print("END")
if __name__ == '__main__':
if __name__ == "__main__":
make_disjoint_metro_polygons()

View file

@ -4,25 +4,29 @@ import json
from process_subways import download_cities
if __name__ == '__main__':
if __name__ == "__main__":
arg_parser = argparse.ArgumentParser(
description="""
This script generates a list of good/all network names.
It is used by subway render to generate the list of network at frontend.
It uses two sources: a mapsme.json validator output with good networks, and
a google spreadsheet with networks for the process_subways.download_cities()
function.""",
description=(
"""This script generates a list of good/all network names. It is
used by subway render to generate the list of network at frontend.
It uses two sources: a mapsme.json validator output with good
networks, and a google spreadsheet with networks for the
process_subways.download_cities() function."""
),
formatter_class=argparse.RawTextHelpFormatter,
)
arg_parser.add_argument(
'subway_json_file',
type=argparse.FileType('r'),
help="Validator output defined by -o option of process_subways.py script",
"subway_json_file",
type=argparse.FileType("r"),
help=(
"Validator output defined by -o option "
"of process_subways.py script",
),
)
arg_parser.add_argument(
'--with-bad',
"--with-bad",
action="store_true",
help="Whether to include cities validation of which was failed",
)
@ -34,7 +38,7 @@ if __name__ == '__main__':
subway_json = json.load(subway_json_file)
good_cities = set(
n.get('network', n.get('title')) for n in subway_json['networks']
n.get("network", n.get("title")) for n in subway_json["networks"]
)
cities = download_cities()

View file

@ -11,7 +11,6 @@ import urllib.parse
import urllib.request
import processors
from subway_io import (
dump_yaml,
load_xml,
@ -30,30 +29,30 @@ from subway_structure import (
def overpass_request(overground, overpass_api, bboxes):
query = '[out:json][timeout:1000];('
query = "[out:json][timeout:1000];("
modes = MODES_OVERGROUND if overground else MODES_RAPID
for bbox in bboxes:
bbox_part = '({})'.format(','.join(str(coord) for coord in bbox))
query += '('
bbox_part = "({})".format(",".join(str(coord) for coord in bbox))
query += "("
for mode in modes:
query += 'rel[route="{}"]{};'.format(mode, bbox_part)
query += ');'
query += 'rel(br)[type=route_master];'
query += ");"
query += "rel(br)[type=route_master];"
if not overground:
query += 'node[railway=subway_entrance]{};'.format(bbox_part)
query += 'rel[public_transport=stop_area]{};'.format(bbox_part)
query += "node[railway=subway_entrance]{};".format(bbox_part)
query += "rel[public_transport=stop_area]{};".format(bbox_part)
query += (
'rel(br)[type=public_transport][public_transport=stop_area_group];'
"rel(br)[type=public_transport][public_transport=stop_area_group];"
)
query += ');(._;>>;);out body center qt;'
logging.debug('Query: %s', query)
url = '{}?data={}'.format(overpass_api, urllib.parse.quote(query))
query += ");(._;>>;);out body center qt;"
logging.info("Query: %s", query)
url = "{}?data={}".format(overpass_api, urllib.parse.quote(query))
response = urllib.request.urlopen(url, timeout=1000)
if response.getcode() != 200:
raise Exception(
'Failed to query Overpass API: HTTP {}'.format(response.getcode())
"Failed to query Overpass API: HTTP {}".format(response.getcode())
)
return json.load(response)['elements']
return json.load(response)["elements"]
def multi_overpass(overground, overpass_api, bboxes):
@ -63,16 +62,13 @@ def multi_overpass(overground, overpass_api, bboxes):
for i in range(0, len(bboxes) + SLICE_SIZE - 1, SLICE_SIZE):
if i > 0:
time.sleep(INTERREQUEST_WAIT)
result.extend(
overpass_request(
overground, overpass_api, bboxes[i : i + SLICE_SIZE]
)
)
bboxes_i = bboxes[i : i + SLICE_SIZE] # noqa E203
result.extend(overpass_request(overground, overpass_api, bboxes_i))
return result
def slugify(name):
return re.sub(r'[^a-z0-9_-]+', '', name.lower().replace(' ', '_'))
return re.sub(r"[^a-z0-9_-]+", "", name.lower().replace(" ", "_"))
def calculate_centers(elements):
@ -89,13 +85,13 @@ def calculate_centers(elements):
def calculate_way_center(el):
# If element has been queried via overpass-api with 'out center;'
# clause then ways already have 'center' attribute
if 'center' in el:
ways[el['id']] = (el['center']['lat'], el['center']['lon'])
if "center" in el:
ways[el["id"]] = (el["center"]["lat"], el["center"]["lon"])
return
center = [0, 0]
count = 0
way_nodes = el['nodes']
way_nodes_len = len(el['nodes'])
way_nodes = el["nodes"]
way_nodes_len = len(el["nodes"])
for i, nd in enumerate(way_nodes):
if nd not in nodes:
continue
@ -110,20 +106,20 @@ def calculate_centers(elements):
center[1] += nodes[nd][1]
count += 1
if count > 0:
el['center'] = {'lat': center[0] / count, 'lon': center[1] / count}
ways[el['id']] = (el['center']['lat'], el['center']['lon'])
el["center"] = {"lat": center[0] / count, "lon": center[1] / count}
ways[el["id"]] = (el["center"]["lat"], el["center"]["lon"])
def calculate_relation_center(el):
# If element has been queried via overpass-api with 'out center;'
# clause then some relations already have 'center' attribute
if 'center' in el:
relations[el['id']] = (el['center']['lat'], el['center']['lon'])
if "center" in el:
relations[el["id"]] = (el["center"]["lat"], el["center"]["lon"])
return True
center = [0, 0]
count = 0
for m in el.get('members', []):
if m['type'] == 'relation' and m['ref'] not in relations:
if m['ref'] in empty_relations:
for m in el.get("members", []):
if m["type"] == "relation" and m["ref"] not in relations:
if m["ref"] in empty_relations:
# Ignore empty child relations
continue
else:
@ -131,31 +127,31 @@ def calculate_centers(elements):
return False
member_container = (
nodes
if m['type'] == 'node'
if m["type"] == "node"
else ways
if m['type'] == 'way'
if m["type"] == "way"
else relations
)
if m['ref'] in member_container:
center[0] += member_container[m['ref']][0]
center[1] += member_container[m['ref']][1]
if m["ref"] in member_container:
center[0] += member_container[m["ref"]][0]
center[1] += member_container[m["ref"]][1]
count += 1
if count == 0:
empty_relations.add(el['id'])
empty_relations.add(el["id"])
else:
el['center'] = {'lat': center[0] / count, 'lon': center[1] / count}
relations[el['id']] = (el['center']['lat'], el['center']['lon'])
el["center"] = {"lat": center[0] / count, "lon": center[1] / count}
relations[el["id"]] = (el["center"]["lat"], el["center"]["lon"])
return True
relations_without_center = []
for el in elements:
if el['type'] == 'node':
nodes[el['id']] = (el['lat'], el['lon'])
elif el['type'] == 'way':
if 'nodes' in el:
if el["type"] == "node":
nodes[el["id"]] = (el["lat"], el["lon"])
elif el["type"] == "way":
if "nodes" in el:
calculate_way_center(el)
elif el['type'] == 'relation':
elif el["type"] == "relation":
if not calculate_relation_center(el):
relations_without_center.append(el)
@ -173,14 +169,14 @@ def calculate_centers(elements):
logging.error(
"Cannot calculate center for the relations (%d in total): %s%s",
len(relations_without_center),
', '.join(str(rel['id']) for rel in relations_without_center[:20]),
", ".join(str(rel["id"]) for rel in relations_without_center[:20]),
", ..." if len(relations_without_center) > 20 else "",
)
if empty_relations:
logging.warning(
"Empty relations (%d in total): %s%s",
len(empty_relations),
', '.join(str(x) for x in list(empty_relations)[:20]),
", ".join(str(x) for x in list(empty_relations)[:20]),
", ..." if len(empty_relations) > 20 else "",
)
@ -223,72 +219,72 @@ def validate_cities(cities):
def main():
parser = argparse.ArgumentParser()
parser.add_argument(
'-i',
'--source',
help='File to write backup of OSM data, or to read data from',
"-i",
"--source",
help="File to write backup of OSM data, or to read data from",
)
parser.add_argument(
'-x', '--xml', help='OSM extract with routes, to read data from'
"-x", "--xml", help="OSM extract with routes, to read data from"
)
parser.add_argument(
'--overpass-api',
default='http://overpass-api.de/api/interpreter',
"--overpass-api",
default="http://overpass-api.de/api/interpreter",
help="Overpass API URL",
)
parser.add_argument(
'-q',
'--quiet',
action='store_true',
help='Show only warnings and errors',
"-q",
"--quiet",
action="store_true",
help="Show only warnings and errors",
)
parser.add_argument(
'-c', '--city', help='Validate only a single city or a country'
"-c", "--city", help="Validate only a single city or a country"
)
parser.add_argument(
'-t',
'--overground',
action='store_true',
help='Process overground transport instead of subways',
"-t",
"--overground",
action="store_true",
help="Process overground transport instead of subways",
)
parser.add_argument(
'-e',
'--entrances',
type=argparse.FileType('w', encoding='utf-8'),
help='Export unused subway entrances as GeoJSON here',
"-e",
"--entrances",
type=argparse.FileType("w", encoding="utf-8"),
help="Export unused subway entrances as GeoJSON here",
)
parser.add_argument(
'-l',
'--log',
type=argparse.FileType('w', encoding='utf-8'),
help='Validation JSON file name',
"-l",
"--log",
type=argparse.FileType("w", encoding="utf-8"),
help="Validation JSON file name",
)
for processor_name, processor in inspect.getmembers(
processors, inspect.ismodule
processors, inspect.ismodule
):
if not processor_name.startswith("_"):
parser.add_argument(
f'--output-{processor_name}',
f"--output-{processor_name}",
help=(
'Processed metro systems output filename '
f'in {processor_name.upper()} format'
"Processed metro systems output filename "
f"in {processor_name.upper()} format"
),
)
parser.add_argument('--cache', help='Cache file name for processed data')
parser.add_argument("--cache", help="Cache file name for processed data")
parser.add_argument(
'-r', '--recovery-path', help='Cache file name for error recovery'
"-r", "--recovery-path", help="Cache file name for error recovery"
)
parser.add_argument(
'-d', '--dump', help='Make a YAML file for a city data'
"-d", "--dump", help="Make a YAML file for a city data"
)
parser.add_argument(
'-j', '--geojson', help='Make a GeoJSON file for a city data'
"-j", "--geojson", help="Make a GeoJSON file for a city data"
)
parser.add_argument(
'--crude',
action='store_true',
help='Do not use OSM railway geometry for GeoJSON',
"--crude",
action="store_true",
help="Do not use OSM railway geometry for GeoJSON",
)
options = parser.parse_args()
@ -298,8 +294,8 @@ def main():
log_level = logging.INFO
logging.basicConfig(
level=log_level,
datefmt='%H:%M:%S',
format='%(asctime)s %(levelname)-7s %(message)s',
datefmt="%H:%M:%S",
format="%(asctime)s %(levelname)-7s %(message)s",
)
# Downloading cities from Google Spreadsheets
@ -311,7 +307,7 @@ def main():
if c.name == options.city or c.country == options.city
]
if not cities:
logging.error('No cities to process')
logging.error("No cities to process")
sys.exit(2)
# Augment cities with recovery data
@ -321,59 +317,59 @@ def main():
for city in cities:
city.recovery_data = recovery_data.get(city.name, None)
logging.info('Read %s metro networks', len(cities))
logging.info("Read %s metro networks", len(cities))
# Reading cached json, loading XML or querying Overpass API
if options.source and os.path.exists(options.source):
logging.info('Reading %s', options.source)
with open(options.source, 'r') as f:
logging.info("Reading %s", options.source)
with open(options.source, "r") as f:
osm = json.load(f)
if 'elements' in osm:
osm = osm['elements']
if "elements" in osm:
osm = osm["elements"]
calculate_centers(osm)
elif options.xml:
logging.info('Reading %s', options.xml)
logging.info("Reading %s", options.xml)
osm = load_xml(options.xml)
calculate_centers(osm)
if options.source:
with open(options.source, 'w', encoding='utf-8') as f:
with open(options.source, "w", encoding="utf-8") as f:
json.dump(osm, f)
else:
if len(cities) > 10:
logging.error(
'Would not download that many cities from Overpass API, '
'choose a smaller set'
"Would not download that many cities from Overpass API, "
"choose a smaller set"
)
sys.exit(3)
bboxes = [c.bbox for c in cities]
logging.info('Downloading data from Overpass API')
logging.info("Downloading data from Overpass API")
osm = multi_overpass(options.overground, options.overpass_api, bboxes)
calculate_centers(osm)
if options.source:
with open(options.source, 'w', encoding='utf-8') as f:
with open(options.source, "w", encoding="utf-8") as f:
json.dump(osm, f)
logging.info('Downloaded %s elements', len(osm))
logging.info("Downloaded %s elements", len(osm))
logging.info('Sorting elements by city')
logging.info("Sorting elements by city")
add_osm_elements_to_cities(osm, cities)
logging.info('Building routes for each city')
logging.info("Building routes for each city")
good_cities = validate_cities(cities)
logging.info('Finding transfer stations')
logging.info("Finding transfer stations")
transfers = find_transfers(osm, cities)
good_city_names = set(c.name for c in good_cities)
logging.info(
'%s good cities: %s',
"%s good cities: %s",
len(good_city_names),
', '.join(sorted(good_city_names)),
", ".join(sorted(good_city_names)),
)
bad_city_names = set(c.name for c in cities) - good_city_names
logging.info(
'%s bad cities: %s',
"%s bad cities: %s",
len(bad_city_names),
', '.join(sorted(bad_city_names)),
", ".join(sorted(bad_city_names)),
)
if options.recovery_path:
@ -386,46 +382,46 @@ def main():
if os.path.isdir(options.dump):
for c in cities:
with open(
os.path.join(options.dump, slugify(c.name) + '.yaml'),
'w',
encoding='utf-8',
os.path.join(options.dump, slugify(c.name) + ".yaml"),
"w",
encoding="utf-8",
) as f:
dump_yaml(c, f)
elif len(cities) == 1:
with open(options.dump, 'w', encoding='utf-8') as f:
with open(options.dump, "w", encoding="utf-8") as f:
dump_yaml(cities[0], f)
else:
logging.error('Cannot dump %s cities at once', len(cities))
logging.error("Cannot dump %s cities at once", len(cities))
if options.geojson:
if os.path.isdir(options.geojson):
for c in cities:
with open(
os.path.join(
options.geojson, slugify(c.name) + '.geojson'
options.geojson, slugify(c.name) + ".geojson"
),
'w',
encoding='utf-8',
"w",
encoding="utf-8",
) as f:
json.dump(make_geojson(c, not options.crude), f)
elif len(cities) == 1:
with open(options.geojson, 'w', encoding='utf-8') as f:
with open(options.geojson, "w", encoding="utf-8") as f:
json.dump(make_geojson(cities[0], not options.crude), f)
else:
logging.error(
'Cannot make a geojson of %s cities at once', len(cities)
"Cannot make a geojson of %s cities at once", len(cities)
)
if options.log:
res = []
for c in cities:
v = c.get_validation_result()
v['slug'] = slugify(c.name)
v["slug"] = slugify(c.name)
res.append(v)
json.dump(res, options.log, indent=2, ensure_ascii=False)
for processor_name, processor in inspect.getmembers(
processors, inspect.ismodule
processors, inspect.ismodule
):
option_name = f"output_{processor_name}"
@ -436,5 +432,5 @@ def main():
processor.process(cities, transfers, filename, options.cache)
if __name__ == '__main__':
if __name__ == "__main__":
main()

View file

@ -1,2 +1,4 @@
# Import only those processors (modules) you want to use
from . import mapsme, gtfs
# Import only those processors (modules) you want to use.
# Ignore F401 "module imported but unused" violation since these modules
# are addressed via introspection.
from . import mapsme, gtfs # noqa F401

View file

@ -328,7 +328,8 @@ def process(
transit_data = transit_to_dict(cities, transfers)
gtfs_data = transit_data_to_gtfs(transit_data)
# TODO: make universal cache for all processors, and apply the cache to GTFS
# TODO: make universal cache for all processors,
# and apply the cache to GTFS
make_gtfs(filename, gtfs_data)

View file

@ -1,9 +1,14 @@
import json
import os
import logging
import os
from collections import defaultdict
from subway_structure import (
DISPLACEMENT_TOLERANCE,
distance,
el_center,
Station,
)
from ._common import (
DEFAULT_INTERVAL,
format_colour,
@ -11,19 +16,12 @@ from ._common import (
SPEED_ON_TRANSFER,
TRANSFER_PENALTY,
)
from subway_structure import (
distance,
el_center,
Station,
DISPLACEMENT_TOLERANCE,
)
OSM_TYPES = {'n': (0, 'node'), 'w': (2, 'way'), 'r': (3, 'relation')}
OSM_TYPES = {"n": (0, "node"), "w": (2, "way"), "r": (3, "relation")}
ENTRANCE_PENALTY = 60 # seconds
SPEED_TO_ENTRANCE = 5 * KMPH_TO_MPS # m/s
SPEED_ON_LINE = 40 * KMPH_TO_MPS # m/s
DEFAULT_INTERVAL = 2.5 * 60 # seconds
def uid(elid, typ=None):
@ -32,7 +30,7 @@ def uid(elid, typ=None):
if not typ:
osm_id = (osm_id << 2) + OSM_TYPES[t][0]
elif typ != t:
raise Exception('Got {}, expected {}'.format(elid, typ))
raise Exception("Got {}, expected {}".format(elid, typ))
return osm_id << 1
@ -67,7 +65,8 @@ def if_object_is_used(method):
class MapsmeCache:
def __init__(self, cache_path, cities):
if not cache_path:
# cache is not used, all actions with cache must be silently skipped
# Cache is not used,
# all actions with cache must be silently skipped
self.is_used = False
return
self.cache_path = cache_path
@ -75,7 +74,7 @@ class MapsmeCache:
self.cache = {}
if os.path.exists(cache_path):
try:
with open(cache_path, 'r', encoding='utf-8') as f:
with open(cache_path, "r", encoding="utf-8") as f:
self.cache = json.load(f)
except json.decoder.JSONDecodeError:
logging.warning(
@ -94,9 +93,9 @@ class MapsmeCache:
not moved far away.
"""
city_cache_data = self.cache[city.name]
for stoparea_id, cached_stoparea in city_cache_data['stops'].items():
station_id = cached_stoparea['osm_type'][0] + str(
cached_stoparea['osm_id']
for stoparea_id, cached_stoparea in city_cache_data["stops"].items():
station_id = cached_stoparea["osm_type"][0] + str(
cached_stoparea["osm_id"]
)
city_station = city.elements.get(station_id)
if not city_station or not Station.is_station(
@ -105,7 +104,7 @@ class MapsmeCache:
return False
station_coords = el_center(city_station)
cached_station_coords = tuple(
cached_stoparea[coord] for coord in ('lon', 'lat')
cached_stoparea[coord] for coord in ("lon", "lat")
)
displacement = distance(station_coords, cached_station_coords)
if displacement > DISPLACEMENT_TOLERANCE:
@ -121,8 +120,8 @@ class MapsmeCache:
if not city.is_good and city.name in self.cache:
city_cached_data = self.cache[city.name]
if self._is_cached_city_usable(city):
stops.update(city_cached_data['stops'])
networks.append(city_cached_data['network'])
stops.update(city_cached_data["stops"])
networks.append(city_cached_data["network"])
logging.info("Taking %s from cache", city.name)
self.recovered_city_names.add(city.name)
@ -131,7 +130,7 @@ class MapsmeCache:
"""Add transfers from usable cached cities to 'transfers' dict
passed as argument."""
for city_name in self.recovered_city_names:
city_cached_transfers = self.cache[city_name]['transfers']
city_cached_transfers = self.cache[city_name]["transfers"]
for stop1_uid, stop2_uid, transfer_time in city_cached_transfers:
if (stop1_uid, stop2_uid) not in transfers:
transfers[(stop1_uid, stop2_uid)] = transfer_time
@ -141,9 +140,10 @@ class MapsmeCache:
"""Create/replace one cache element with new data container.
This should be done for each good city."""
self.cache[city_name] = {
'network': network,
'stops': {}, # stoparea el_id -> jsonified stop data
'transfers': [], # list of tuples (stoparea1_uid, stoparea2_uid, time); uid1 < uid2
"network": network,
"stops": {}, # stoparea el_id -> jsonified stop data
"transfers": [], # list of tuples
# (stoparea1_uid, stoparea2_uid, time); uid1 < uid2
}
@if_object_is_used
@ -157,7 +157,7 @@ class MapsmeCache:
"""Add stoparea to the cache of each city the stoparea is in."""
stoparea_uid = uid(stoparea_id)
for city_name in self.stop_cities[stoparea_uid]:
self.cache[city_name]['stops'][stoparea_id] = st
self.cache[city_name]["stops"][stoparea_id] = st
@if_object_is_used
def add_transfer(self, stoparea1_uid, stoparea2_uid, transfer_time):
@ -167,14 +167,14 @@ class MapsmeCache:
& self.stop_cities[stoparea1_uid]
& self.stop_cities[stoparea2_uid]
):
self.cache[city_name]['transfers'].append(
self.cache[city_name]["transfers"].append(
(stoparea1_uid, stoparea2_uid, transfer_time)
)
@if_object_is_used
def save(self):
try:
with open(self.cache_path, 'w', encoding='utf-8') as f:
with open(self.cache_path, "w", encoding="utf-8") as f:
json.dump(self.cache, f, ensure_ascii=False)
except Exception as e:
logging.warning("Failed to save cache: %s", str(e))
@ -192,14 +192,14 @@ def process(cities, transfers, filename, cache_path):
exits = []
min_distance = None
for n in nodes:
d = distance(center, (n['lon'], n['lat']))
d = distance(center, (n["lon"], n["lat"]))
if not min_distance:
min_distance = d * 2 / 3
elif d < min_distance:
continue
too_close = False
for e in exits:
d = distance((e['lon'], e['lat']), (n['lon'], n['lat']))
d = distance((e["lon"], e["lat"]), (n["lon"], n["lat"]))
if d < min_distance:
too_close = True
break
@ -217,20 +217,20 @@ def process(cities, transfers, filename, cache_path):
cache.provide_stops_and_networks(stops, networks)
for city in good_cities:
network = {'network': city.name, 'routes': [], 'agency_id': city.id}
network = {"network": city.name, "routes": [], "agency_id": city.id}
cache.initialize_good_city(city.name, network)
for route in city:
routes = {
'type': route.mode,
'ref': route.ref,
'name': route.name,
'colour': format_colour(route.colour),
'route_id': uid(route.id, 'r'),
'itineraries': [],
"type": route.mode,
"ref": route.ref,
"name": route.name,
"colour": format_colour(route.colour),
"route_id": uid(route.id, "r"),
"itineraries": [],
}
if route.infill:
routes['casing'] = routes['colour']
routes['colour'] = format_colour(route.infill)
routes["casing"] = routes["colour"]
routes["colour"] = format_colour(route.infill)
for i, variant in enumerate(route):
itin = []
for stop in variant:
@ -242,41 +242,42 @@ def process(cities, transfers, filename, cache_path):
round(stop.distance / SPEED_ON_LINE),
]
)
# Make exits from platform nodes, if we don't have proper exits
# Make exits from platform nodes,
# if we don't have proper exits
if (
len(stop.stoparea.entrances) + len(stop.stoparea.exits)
== 0
):
for pl in stop.stoparea.platforms:
pl_el = city.elements[pl]
if pl_el['type'] == 'node':
if pl_el["type"] == "node":
pl_nodes = [pl_el]
elif pl_el['type'] == 'way':
elif pl_el["type"] == "way":
pl_nodes = [
city.elements.get('n{}'.format(n))
for n in pl_el['nodes']
city.elements.get("n{}".format(n))
for n in pl_el["nodes"]
]
else:
pl_nodes = []
for m in pl_el['members']:
if m['type'] == 'way':
for m in pl_el["members"]:
if m["type"] == "way":
if (
'{}{}'.format(
m['type'][0], m['ref']
"{}{}".format(
m["type"][0], m["ref"]
)
in city.elements
):
pl_nodes.extend(
[
city.elements.get(
'n{}'.format(n)
"n{}".format(n)
)
for n in city.elements[
'{}{}'.format(
m['type'][0],
m['ref'],
"{}{}".format(
m["type"][0],
m["ref"],
)
]['nodes']
]["nodes"]
]
)
pl_nodes = [n for n in pl_nodes if n]
@ -284,37 +285,39 @@ def process(cities, transfers, filename, cache_path):
stop.stoparea.centers[pl], pl_nodes
)
routes['itineraries'].append(
routes["itineraries"].append(
{
'stops': itin,
'interval': round(variant.interval or DEFAULT_INTERVAL),
"stops": itin,
"interval": round(
variant.interval or DEFAULT_INTERVAL
),
}
)
network['routes'].append(routes)
network["routes"].append(routes)
networks.append(network)
for stop_id, stop in stop_areas.items():
st = {
'name': stop.name,
'int_name': stop.int_name,
'lat': stop.center[1],
'lon': stop.center[0],
'osm_type': OSM_TYPES[stop.station.id[0]][1],
'osm_id': int(stop.station.id[1:]),
'id': uid(stop.id),
'entrances': [],
'exits': [],
"name": stop.name,
"int_name": stop.int_name,
"lat": stop.center[1],
"lon": stop.center[0],
"osm_type": OSM_TYPES[stop.station.id[0]][1],
"osm_id": int(stop.station.id[1:]),
"id": uid(stop.id),
"entrances": [],
"exits": [],
}
for e_l, k in ((stop.entrances, 'entrances'), (stop.exits, 'exits')):
for e_l, k in ((stop.entrances, "entrances"), (stop.exits, "exits")):
for e in e_l:
if e[0] == 'n':
if e[0] == "n":
st[k].append(
{
'osm_type': 'node',
'osm_id': int(e[1:]),
'lon': stop.centers[e][0],
'lat': stop.centers[e][1],
'distance': ENTRANCE_PENALTY
"osm_type": "node",
"osm_id": int(e[1:]),
"lon": stop.centers[e][0],
"lat": stop.centers[e][1],
"distance": ENTRANCE_PENALTY
+ round(
distance(stop.centers[e], stop.center)
/ SPEED_TO_ENTRANCE
@ -325,31 +328,31 @@ def process(cities, transfers, filename, cache_path):
if stop.platforms:
for pl in stop.platforms:
for n in platform_nodes[pl]:
for k in ('entrances', 'exits'):
for k in ("entrances", "exits"):
st[k].append(
{
'osm_type': n['type'],
'osm_id': n['id'],
'lon': n['lon'],
'lat': n['lat'],
'distance': ENTRANCE_PENALTY
"osm_type": n["type"],
"osm_id": n["id"],
"lon": n["lon"],
"lat": n["lat"],
"distance": ENTRANCE_PENALTY
+ round(
distance(
(n['lon'], n['lat']), stop.center
(n["lon"], n["lat"]), stop.center
)
/ SPEED_TO_ENTRANCE
),
}
)
else:
for k in ('entrances', 'exits'):
for k in ("entrances", "exits"):
st[k].append(
{
'osm_type': OSM_TYPES[stop.station.id[0]][1],
'osm_id': int(stop.station.id[1:]),
'lon': stop.centers[stop.id][0],
'lat': stop.centers[stop.id][1],
'distance': 60,
"osm_type": OSM_TYPES[stop.station.id[0]][1],
"osm_id": int(stop.station.id[1:]),
"lon": stop.centers[stop.id][0],
"lat": stop.centers[stop.id][1],
"distance": 60,
}
)
@ -385,9 +388,9 @@ def process(cities, transfers, filename, cache_path):
]
result = {
'stops': list(stops.values()),
'transfers': pairwise_transfers,
'networks': networks,
"stops": list(stops.values()),
"transfers": pairwise_transfers,
"networks": networks,
}
if not filename.lower().endswith("json"):

View file

@ -1,14 +1,15 @@
#!/usr/bin/env python3
import json
import codecs
from lxml import etree
import sys
import kdtree
import json
import math
import re
import sys
import urllib.parse
import urllib.request
import kdtree
from lxml import etree
QUERY = """
[out:json][timeout:250][bbox:{{bbox}}];
@ -32,17 +33,17 @@ out meta center qt;
def el_id(el):
return el['type'][0] + str(el.get('id', el.get('ref', '')))
return el["type"][0] + str(el.get("id", el.get("ref", "")))
class StationWrapper:
def __init__(self, st):
if 'center' in st:
self.coords = (st['center']['lon'], st['center']['lat'])
elif 'lon' in st:
self.coords = (st['lon'], st['lat'])
if "center" in st:
self.coords = (st["center"]["lon"], st["center"]["lat"])
elif "lon" in st:
self.coords = (st["lon"], st["lat"])
else:
raise Exception('Coordinates not found for station {}'.format(st))
raise Exception("Coordinates not found for station {}".format(st))
self.station = st
def __len__(self):
@ -53,85 +54,85 @@ class StationWrapper:
def distance(self, other):
"""Calculate distance in meters."""
dx = math.radians(self[0] - other['lon']) * math.cos(
0.5 * math.radians(self[1] + other['lat'])
dx = math.radians(self[0] - other["lon"]) * math.cos(
0.5 * math.radians(self[1] + other["lat"])
)
dy = math.radians(self[1] - other['lat'])
dy = math.radians(self[1] - other["lat"])
return 6378137 * math.sqrt(dx * dx + dy * dy)
def overpass_request(bbox):
url = 'http://overpass-api.de/api/interpreter?data={}'.format(
urllib.parse.quote(QUERY.replace('{{bbox}}', bbox))
url = "http://overpass-api.de/api/interpreter?data={}".format(
urllib.parse.quote(QUERY.replace("{{bbox}}", bbox))
)
response = urllib.request.urlopen(url, timeout=1000)
if response.getcode() != 200:
raise Exception(
'Failed to query Overpass API: HTTP {}'.format(response.getcode())
"Failed to query Overpass API: HTTP {}".format(response.getcode())
)
reader = codecs.getreader('utf-8')
return json.load(reader(response))['elements']
reader = codecs.getreader("utf-8")
return json.load(reader(response))["elements"]
def add_stop_areas(src):
if not src:
raise Exception('Empty dataset provided to add_stop_areas')
raise Exception("Empty dataset provided to add_stop_areas")
# Add station=* tags to stations in subway and light_rail routes
stations = {}
for el in src:
if 'tags' in el and el['tags'].get('railway', None) == 'station':
if "tags" in el and el["tags"].get("railway", None) == "station":
stations[el_id(el)] = el
for el in src:
if (
el['type'] == 'relation'
and 'tags' in el
and el['tags'].get('route', None) in ('subway', 'light_rail')
el["type"] == "relation"
and "tags" in el
and el["tags"].get("route", None) in ("subway", "light_rail")
):
for m in el['members']:
for m in el["members"]:
st = stations.get(el_id(m), None)
if st and 'station' not in st['tags']:
st['tags']['station'] = el['tags']['route']
st['modified'] = True
if st and "station" not in st["tags"]:
st["tags"]["station"] = el["tags"]["route"]
st["modified"] = True
# Create a kd-tree out of subway stations
stations = kdtree.create(dimensions=2)
for el in src:
if 'tags' in el and el['tags'].get('station', None) in (
'subway',
'light_rail',
if "tags" in el and el["tags"].get("station", None) in (
"subway",
"light_rail",
):
stations.add(StationWrapper(el))
if stations.is_leaf:
raise Exception('No stations found')
raise Exception("No stations found")
# Populate a list of nearby subway exits and platforms for each station
MAX_DISTANCE = 300 # meters
stop_areas = {}
for el in src:
if 'tags' not in el:
if "tags" not in el:
continue
if 'station' in el['tags']:
if "station" in el["tags"]:
continue
if el['tags'].get('railway', None) not in (
'subway_entrance',
'platform',
) and el['tags'].get('public_transport', None) not in (
'platform',
'stop_position',
if el["tags"].get("railway", None) not in (
"subway_entrance",
"platform",
) and el["tags"].get("public_transport", None) not in (
"platform",
"stop_position",
):
continue
coords = el.get('center', el)
station = stations.search_nn((coords['lon'], coords['lat']))[0].data
coords = el.get("center", el)
station = stations.search_nn((coords["lon"], coords["lat"]))[0].data
if station.distance(coords) < MAX_DISTANCE:
k = (
station.station['id'],
station.station['tags'].get('name', 'station_with_no_name'),
station.station["id"],
station.station["tags"].get("name", "station_with_no_name"),
)
# Disregard exits and platforms that are differently named
if el['tags'].get('name', k[1]) == k[1]:
if el["tags"].get("name", k[1]) == k[1]:
if k not in stop_areas:
stop_areas[k] = {el_id(station.station): station.station}
stop_areas[k][el_id(el)] = el
@ -139,11 +140,11 @@ def add_stop_areas(src):
# Find existing stop_area relations for stations and remove these stations
for el in src:
if (
el['type'] == 'relation'
and el['tags'].get('public_transport', None) == 'stop_area'
el["type"] == "relation"
and el["tags"].get("public_transport", None) == "stop_area"
):
found = False
for m in el['members']:
for m in el["members"]:
if found:
break
for st in stop_areas:
@ -153,89 +154,90 @@ def add_stop_areas(src):
break
# Create OSM XML for new stop_area relations
root = etree.Element('osm', version='0.6')
root = etree.Element("osm", version="0.6")
rid = -1
for st, members in stop_areas.items():
rel = etree.SubElement(root, 'relation', id=str(rid))
rel = etree.SubElement(root, "relation", id=str(rid))
rid -= 1
etree.SubElement(rel, 'tag', k='type', v='public_transport')
etree.SubElement(rel, 'tag', k='public_transport', v='stop_area')
etree.SubElement(rel, 'tag', k='name', v=st[1])
etree.SubElement(rel, "tag", k="type", v="public_transport")
etree.SubElement(rel, "tag", k="public_transport", v="stop_area")
etree.SubElement(rel, "tag", k="name", v=st[1])
for m in members.values():
if (
m['tags'].get(
'railway', m['tags'].get('public_transport', None)
m["tags"].get(
"railway", m["tags"].get("public_transport", None)
)
== 'platform'
== "platform"
):
role = 'platform'
elif m['tags'].get('public_transport', None) == 'stop_position':
role = 'stop'
role = "platform"
elif m["tags"].get("public_transport", None) == "stop_position":
role = "stop"
else:
role = ''
role = ""
etree.SubElement(
rel, 'member', ref=str(m['id']), type=m['type'], role=role
rel, "member", ref=str(m["id"]), type=m["type"], role=role
)
# Add all downloaded elements
for el in src:
obj = etree.SubElement(root, el['type'])
obj = etree.SubElement(root, el["type"])
for a in (
'id',
'type',
'user',
'uid',
'version',
'changeset',
'timestamp',
'lat',
'lon',
"id",
"type",
"user",
"uid",
"version",
"changeset",
"timestamp",
"lat",
"lon",
):
if a in el:
obj.set(a, str(el[a]))
if 'modified' in el:
obj.set('action', 'modify')
if 'tags' in el:
for k, v in el['tags'].items():
etree.SubElement(obj, 'tag', k=k, v=v)
if 'members' in el:
for m in el['members']:
if "modified" in el:
obj.set("action", "modify")
if "tags" in el:
for k, v in el["tags"].items():
etree.SubElement(obj, "tag", k=k, v=v)
if "members" in el:
for m in el["members"]:
etree.SubElement(
obj,
'member',
ref=str(m['ref']),
type=m['type'],
role=m.get('role', ''),
"member",
ref=str(m["ref"]),
type=m["type"],
role=m.get("role", ""),
)
if 'nodes' in el:
for n in el['nodes']:
etree.SubElement(obj, 'nd', ref=str(n))
if "nodes" in el:
for n in el["nodes"]:
etree.SubElement(obj, "nd", ref=str(n))
return etree.tostring(root, pretty_print=True)
if __name__ == '__main__':
if __name__ == "__main__":
if len(sys.argv) < 2:
print(
'Read a JSON from Overpass and output JOSM OSM XML with added stop_area relations'
"Read a JSON from Overpass and output JOSM OSM XML with added "
"stop_area relations"
)
print(
'Usage: {} {{<export.json>|<bbox>}} [output.osm]'.format(
"Usage: {} {{<export.json>|<bbox>}} [output.osm]".format(
sys.argv[0]
)
)
sys.exit(1)
if re.match(r'^[-0-9.,]+$', sys.argv[1]):
if re.match(r"^[-0-9.,]+$", sys.argv[1]):
src = overpass_request(sys.argv[1])
else:
with open(sys.argv[1], 'r') as f:
src = json.load(f)['elements']
with open(sys.argv[1], "r") as f:
src = json.load(f)["elements"]
result = add_stop_areas(src)
if len(sys.argv) < 3:
print(result.decode('utf-8'))
print(result.decode("utf-8"))
else:
with open(sys.argv[2], 'wb') as f:
with open(sys.argv[2], "wb") as f:
f.write(result)

View file

@ -1,14 +1,15 @@
#!/usr/bin/env python3
import json
import codecs
from lxml import etree
import sys
import kdtree
import json
import math
import re
import sys
import urllib.parse
import urllib.request
import kdtree
from lxml import etree
QUERY = """
[out:json][timeout:250][bbox:{{bbox}}];
@ -23,17 +24,17 @@ out meta center qt;
def el_id(el):
return el['type'][0] + str(el.get('id', el.get('ref', '')))
return el["type"][0] + str(el.get("id", el.get("ref", "")))
class StationWrapper:
def __init__(self, st):
if 'center' in st:
self.coords = (st['center']['lon'], st['center']['lat'])
elif 'lon' in st:
self.coords = (st['lon'], st['lat'])
if "center" in st:
self.coords = (st["center"]["lon"], st["center"]["lat"])
elif "lon" in st:
self.coords = (st["lon"], st["lat"])
else:
raise Exception('Coordinates not found for station {}'.format(st))
raise Exception("Coordinates not found for station {}".format(st))
self.station = st
def __len__(self):
@ -44,50 +45,50 @@ class StationWrapper:
def distance(self, other):
"""Calculate distance in meters."""
dx = math.radians(self[0] - other['lon']) * math.cos(
0.5 * math.radians(self[1] + other['lat'])
dx = math.radians(self[0] - other["lon"]) * math.cos(
0.5 * math.radians(self[1] + other["lat"])
)
dy = math.radians(self[1] - other['lat'])
dy = math.radians(self[1] - other["lat"])
return 6378137 * math.sqrt(dx * dx + dy * dy)
def overpass_request(bbox):
url = 'http://overpass-api.de/api/interpreter?data={}'.format(
urllib.parse.quote(QUERY.replace('{{bbox}}', bbox))
url = "http://overpass-api.de/api/interpreter?data={}".format(
urllib.parse.quote(QUERY.replace("{{bbox}}", bbox))
)
response = urllib.request.urlopen(url, timeout=1000)
if response.getcode() != 200:
raise Exception(
'Failed to query Overpass API: HTTP {}'.format(response.getcode())
"Failed to query Overpass API: HTTP {}".format(response.getcode())
)
reader = codecs.getreader('utf-8')
return json.load(reader(response))['elements']
reader = codecs.getreader("utf-8")
return json.load(reader(response))["elements"]
def is_part_of_stop(tags):
if tags.get('public_transport') in ('platform', 'stop_position'):
if tags.get("public_transport") in ("platform", "stop_position"):
return True
if tags.get('railway') == 'platform':
if tags.get("railway") == "platform":
return True
return False
def add_stop_areas(src):
if not src:
raise Exception('Empty dataset provided to add_stop_areas')
raise Exception("Empty dataset provided to add_stop_areas")
# Create a kd-tree out of tram stations
stations = kdtree.create(dimensions=2)
for el in src:
if 'tags' in el and el['tags'].get('railway') == 'tram_stop':
if "tags" in el and el["tags"].get("railway") == "tram_stop":
stations.add(StationWrapper(el))
if stations.is_leaf:
raise Exception('No stations found')
raise Exception("No stations found")
elements = {}
for el in src:
if el.get('tags'):
if el.get("tags"):
elements[el_id(el)] = el
# Populate a list of nearby subway exits and platforms for each station
@ -96,27 +97,27 @@ def add_stop_areas(src):
for el in src:
# Only tram routes
if (
'tags' not in el
or el['type'] != 'relation'
or el['tags'].get('route') != 'tram'
"tags" not in el
or el["type"] != "relation"
or el["tags"].get("route") != "tram"
):
continue
for m in el['members']:
for m in el["members"]:
if el_id(m) not in elements:
continue
pel = elements[el_id(m)]
if not is_part_of_stop(pel['tags']):
if not is_part_of_stop(pel["tags"]):
continue
if pel['tags'].get('railway') == 'tram_stop':
if pel["tags"].get("railway") == "tram_stop":
continue
coords = pel.get('center', pel)
station = stations.search_nn(
(coords['lon'], coords['lat'])
)[0].data
coords = pel.get("center", pel)
station = stations.search_nn((coords["lon"], coords["lat"]))[
0
].data
if station.distance(coords) < MAX_DISTANCE:
k = (
station.station['id'],
station.station['tags'].get('name', None),
station.station["id"],
station.station["tags"].get("name", None),
)
if k not in stop_areas:
stop_areas[k] = {el_id(station.station): station.station}
@ -125,11 +126,11 @@ def add_stop_areas(src):
# Find existing stop_area relations for stations and remove these stations
for el in src:
if (
el['type'] == 'relation'
and el['tags'].get('public_transport', None) == 'stop_area'
el["type"] == "relation"
and el["tags"].get("public_transport", None) == "stop_area"
):
found = False
for m in el['members']:
for m in el["members"]:
if found:
break
for st in stop_areas:
@ -139,81 +140,81 @@ def add_stop_areas(src):
break
# Create OSM XML for new stop_area relations
root = etree.Element('osm', version='0.6')
root = etree.Element("osm", version="0.6")
rid = -1
for st, members in stop_areas.items():
rel = etree.SubElement(root, 'relation', id=str(rid))
rel = etree.SubElement(root, "relation", id=str(rid))
rid -= 1
etree.SubElement(rel, 'tag', k='type', v='public_transport')
etree.SubElement(rel, 'tag', k='public_transport', v='stop_area')
etree.SubElement(rel, "tag", k="type", v="public_transport")
etree.SubElement(rel, "tag", k="public_transport", v="stop_area")
if st[1]:
etree.SubElement(rel, 'tag', k='name', v=st[1])
etree.SubElement(rel, "tag", k="name", v=st[1])
for m in members.values():
etree.SubElement(
rel, 'member', ref=str(m['id']), type=m['type'], role=''
rel, "member", ref=str(m["id"]), type=m["type"], role=""
)
# Add all downloaded elements
for el in src:
obj = etree.SubElement(root, el['type'])
obj = etree.SubElement(root, el["type"])
for a in (
'id',
'type',
'user',
'uid',
'version',
'changeset',
'timestamp',
'lat',
'lon',
"id",
"type",
"user",
"uid",
"version",
"changeset",
"timestamp",
"lat",
"lon",
):
if a in el:
obj.set(a, str(el[a]))
if 'modified' in el:
obj.set('action', 'modify')
if 'tags' in el:
for k, v in el['tags'].items():
etree.SubElement(obj, 'tag', k=k, v=v)
if 'members' in el:
for m in el['members']:
if "modified" in el:
obj.set("action", "modify")
if "tags" in el:
for k, v in el["tags"].items():
etree.SubElement(obj, "tag", k=k, v=v)
if "members" in el:
for m in el["members"]:
etree.SubElement(
obj,
'member',
ref=str(m['ref']),
type=m['type'],
role=m.get('role', ''),
"member",
ref=str(m["ref"]),
type=m["type"],
role=m.get("role", ""),
)
if 'nodes' in el:
for n in el['nodes']:
etree.SubElement(obj, 'nd', ref=str(n))
if "nodes" in el:
for n in el["nodes"]:
etree.SubElement(obj, "nd", ref=str(n))
return etree.tostring(root, pretty_print=True, encoding="utf-8")
if __name__ == '__main__':
if __name__ == "__main__":
if len(sys.argv) < 2:
print(
'Read a JSON from Overpass and output JOSM OSM XML '
'with added stop_area relations'
"Read a JSON from Overpass and output JOSM OSM XML "
"with added stop_area relations"
)
print(
'Usage: {} {{<export.json>|<bbox>}} [output.osm]'.format(
"Usage: {} {{<export.json>|<bbox>}} [output.osm]".format(
sys.argv[0]
)
)
sys.exit(1)
if re.match(r'^[-0-9.,]+$', sys.argv[1]):
bbox = sys.argv[1].split(',')
src = overpass_request(','.join([bbox[i] for i in (1, 0, 3, 2)]))
if re.match(r"^[-0-9.,]+$", sys.argv[1]):
bbox = sys.argv[1].split(",")
src = overpass_request(",".join([bbox[i] for i in (1, 0, 3, 2)]))
else:
with open(sys.argv[1], 'r') as f:
src = json.load(f)['elements']
with open(sys.argv[1], "r") as f:
src = json.load(f)["elements"]
result = add_stop_areas(src)
if len(sys.argv) < 3:
print(result.decode('utf-8'))
print(result.decode("utf-8"))
else:
with open(sys.argv[2], 'wb') as f:
with open(sys.argv[2], "wb") as f:
f.write(result)

View file

@ -1,28 +1,30 @@
#!/usr/bin/env python3
from flask import Flask, request, make_response, render_template
from flask import Flask, make_response, render_template, request
from make_stop_areas import add_stop_areas, overpass_request
app = Flask(__name__)
app.debug = True
@app.route('/')
@app.route("/")
def form():
return render_template('index.html')
return render_template("index.html")
@app.route('/process', methods=['GET'])
@app.route("/process", methods=["GET"])
def convert():
src = overpass_request(request.args.get('bbox'))
src = overpass_request(request.args.get("bbox"))
if not src:
return 'No data from overpass, sorry.'
return "No data from overpass, sorry."
result = add_stop_areas(src)
response = make_response(result)
response.headers['Content-Disposition'] = (
'attachment; filename="stop_areas.osm"'
)
response.headers[
"Content-Disposition"
] = 'attachment; filename="stop_areas.osm"'
return response
if __name__ == '__main__':
if __name__ == "__main__":
app.run()

View file

@ -12,33 +12,33 @@ def load_xml(f):
elements = []
for event, element in etree.iterparse(f):
if element.tag in ('node', 'way', 'relation'):
el = {'type': element.tag, 'id': int(element.get('id'))}
if element.tag == 'node':
for n in ('lat', 'lon'):
if element.tag in ("node", "way", "relation"):
el = {"type": element.tag, "id": int(element.get("id"))}
if element.tag == "node":
for n in ("lat", "lon"):
el[n] = float(element.get(n))
tags = {}
nd = []
members = []
for sub in element:
if sub.tag == 'tag':
tags[sub.get('k')] = sub.get('v')
elif sub.tag == 'nd':
nd.append(int(sub.get('ref')))
elif sub.tag == 'member':
if sub.tag == "tag":
tags[sub.get("k")] = sub.get("v")
elif sub.tag == "nd":
nd.append(int(sub.get("ref")))
elif sub.tag == "member":
members.append(
{
'type': sub.get('type'),
'ref': int(sub.get('ref')),
'role': sub.get('role', ''),
"type": sub.get("type"),
"ref": int(sub.get("ref")),
"role": sub.get("role", ""),
}
)
if tags:
el['tags'] = tags
el["tags"] = tags
if nd:
el['nodes'] = nd
el["nodes"] = nd
if members:
el['members'] = members
el["members"] = members
elements.append(el)
element.clear()
@ -55,7 +55,7 @@ def _get_yaml_compatible_string(scalar):
if string and (
string[0] in _YAML_SPECIAL_CHARACTERS
or any(seq in string for seq in _YAML_SPECIAL_SEQUENCES)
or string.endswith(':')
or string.endswith(":")
):
string = string.replace("'", "''")
string = "'{}'".format(string)
@ -63,25 +63,25 @@ def _get_yaml_compatible_string(scalar):
def dump_yaml(city, f):
def write_yaml(data, f, indent=''):
def write_yaml(data, f, indent=""):
if isinstance(data, (set, list)):
f.write('\n')
f.write("\n")
for i in data:
f.write(indent)
f.write('- ')
write_yaml(i, f, indent + ' ')
f.write("- ")
write_yaml(i, f, indent + " ")
elif isinstance(data, dict):
f.write('\n')
f.write("\n")
for k, v in data.items():
if v is None:
continue
f.write(indent + _get_yaml_compatible_string(k) + ': ')
write_yaml(v, f, indent + ' ')
f.write(indent + _get_yaml_compatible_string(k) + ": ")
write_yaml(v, f, indent + " ")
if isinstance(v, (list, set, dict)):
f.write('\n')
f.write("\n")
else:
f.write(_get_yaml_compatible_string(data))
f.write('\n')
f.write("\n")
INCLUDE_STOP_AREAS = False
stops = set()
@ -91,14 +91,14 @@ def dump_yaml(city, f):
[(sa.transfer or sa.id, sa.name) for sa in route.stop_areas()]
)
rte = {
'type': route.mode,
'ref': route.ref,
'name': route.name,
'colour': route.colour,
'infill': route.infill,
'station_count': len(stations),
'stations': list(stations.values()),
'itineraries': {},
"type": route.mode,
"ref": route.ref,
"name": route.name,
"colour": route.colour,
"infill": route.infill,
"station_count": len(stations),
"stations": list(stations.values()),
"itineraries": {},
}
for variant in route:
if INCLUDE_STOP_AREAS:
@ -107,33 +107,33 @@ def dump_yaml(city, f):
s = st.stoparea
if s.id == s.station.id:
v_stops.append(
'{} ({})'.format(s.station.name, s.station.id)
"{} ({})".format(s.station.name, s.station.id)
)
else:
v_stops.append(
'{} ({}) in {} ({})'.format(
"{} ({}) in {} ({})".format(
s.station.name, s.station.id, s.name, s.id
)
)
else:
v_stops = [
'{} ({})'.format(
"{} ({})".format(
s.stoparea.station.name, s.stoparea.station.id
)
for s in variant
]
rte['itineraries'][variant.id] = v_stops
rte["itineraries"][variant.id] = v_stops
stops.update(v_stops)
routes.append(rte)
transfers = []
for t in city.transfers:
v_stops = ['{} ({})'.format(s.name, s.id) for s in t]
v_stops = ["{} ({})".format(s.name, s.id) for s in t]
transfers.append(sorted(v_stops))
result = {
'stations': sorted(stops),
'transfers': sorted(transfers, key=lambda t: t[0]),
'routes': sorted(routes, key=lambda r: r['ref']),
"stations": sorted(stops),
"transfers": sorted(transfers, key=lambda t: t[0]),
"routes": sorted(routes, key=lambda r: r["ref"]),
}
write_yaml(result, f)
@ -154,15 +154,15 @@ def make_geojson(city, include_tracks_geometry=True):
)
features.append(
{
'type': 'Feature',
'geometry': {
'type': 'LineString',
'coordinates': tracks,
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": tracks,
},
'properties': {
'ref': variant.ref,
'name': variant.name,
'stroke': variant.colour,
"properties": {
"ref": variant.ref,
"name": variant.name,
"stroke": variant.colour,
},
}
)
@ -173,41 +173,41 @@ def make_geojson(city, include_tracks_geometry=True):
for stop in stops:
features.append(
{
'type': 'Feature',
'geometry': {
'type': 'Point',
'coordinates': stop,
"type": "Feature",
"geometry": {
"type": "Point",
"coordinates": stop,
},
'properties': {
'marker-size': 'small',
'marker-symbol': 'circle',
"properties": {
"marker-size": "small",
"marker-symbol": "circle",
},
}
)
for stoparea in stopareas:
features.append(
{
'type': 'Feature',
'geometry': {
'type': 'Point',
'coordinates': stoparea.center,
"type": "Feature",
"geometry": {
"type": "Point",
"coordinates": stoparea.center,
},
'properties': {
'name': stoparea.name,
'marker-size': 'small',
'marker-color': '#ff2600'
"properties": {
"name": stoparea.name,
"marker-size": "small",
"marker-color": "#ff2600"
if stoparea in transfers
else '#797979',
else "#797979",
},
}
)
return {'type': 'FeatureCollection', 'features': features}
return {"type": "FeatureCollection", "features": features}
def _dumps_route_id(route_id):
"""Argument is a route_id that depends on route colour and ref. Name
can be taken from route_master or can be route's own, we don't take it
into consideration. Some of route attributes can be None. The function makes
"""Argument is a route_id that depends on route colour and ref. Name can
be taken from route_master or can be route's own, we don't take it into
consideration. Some of route attributes can be None. The function makes
route_id json-compatible - dumps it to a string."""
return json.dumps(route_id, ensure_ascii=False)
@ -224,7 +224,7 @@ def read_recovery_data(path):
shuffled stations in routes."""
data = None
try:
with open(path, 'r') as f:
with open(path, "r") as f:
try:
data = json.load(f)
except json.decoder.JSONDecodeError as e:
@ -258,21 +258,21 @@ def write_recovery_data(path, current_data, cities):
itineraries = []
for variant in route:
itin = {
'stations': [],
'name': variant.name,
'from': variant.element['tags'].get('from'),
'to': variant.element['tags'].get('to'),
"stations": [],
"name": variant.name,
"from": variant.element["tags"].get("from"),
"to": variant.element["tags"].get("to"),
}
for stop in variant:
station = stop.stoparea.station
station_name = station.name
if station_name == '?' and station.int_name:
if station_name == "?" and station.int_name:
station_name = station.int_name
itin['stations'].append(
itin["stations"].append(
{
'oms_id': station.id,
'name': station_name,
'center': station.center,
"oms_id": station.id,
"name": station_name,
"center": station.center,
}
)
if itin is not None:
@ -293,7 +293,7 @@ def write_recovery_data(path, current_data, cities):
}
for city_name, routes in data.items()
}
with open(path, 'w', encoding='utf-8') as f:
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
except Exception as e:
logging.warning("Cannot write recovery data to '%s': %s", path, str(e))

File diff suppressed because it is too large Load diff

View file

@ -55,7 +55,6 @@ sample_networks = {
"positions_on_rails": [],
},
},
"Only 2 stations connected with rails": {
"xml": """<?xml version='1.0' encoding='UTF-8'?>
<osm version='0.6' generator='JOSM'>
@ -125,7 +124,6 @@ sample_networks = {
"positions_on_rails": [[0], [1]],
},
},
"Only 6 stations, no rails": {
"xml": """<?xml version='1.0' encoding='UTF-8'?>
<osm version='0.6' generator='JOSM'>
@ -214,7 +212,6 @@ sample_networks = {
"positions_on_rails": [],
},
},
"One rail line connecting all stations": {
"xml": """<?xml version='1.0' encoding='UTF-8'?>
<osm version='0.6' generator='JOSM'>
@ -328,7 +325,6 @@ sample_networks = {
"positions_on_rails": [[0], [1], [2], [3], [4], [5]],
},
},
"One rail line connecting all stations except the last": {
"xml": """<?xml version='1.0' encoding='UTF-8'?>
<osm version='0.6' generator='JOSM'>
@ -439,7 +435,6 @@ sample_networks = {
"positions_on_rails": [[0], [1], [2], [3], [4]],
},
},
"One rail line connecting all stations except the fist": {
"xml": """<?xml version='1.0' encoding='UTF-8'?>
<osm version='0.6' generator='JOSM'>
@ -550,7 +545,6 @@ sample_networks = {
"positions_on_rails": [[0], [1], [2], [3], [4]],
},
},
"One rail line connecting all stations except the fist and the last": {
"xml": """<?xml version='1.0' encoding='UTF-8'?>
<osm version='0.6' generator='JOSM'>
@ -658,7 +652,6 @@ sample_networks = {
"positions_on_rails": [[0], [1], [2], [3]],
},
},
"One rail line connecting only 2 first stations": {
"xml": """<?xml version='1.0' encoding='UTF-8'?>
<osm version='0.6' generator='JOSM'>
@ -760,7 +753,6 @@ sample_networks = {
"positions_on_rails": [[0], [1]],
},
},
"One rail line connecting only 2 last stations": {
"xml": """<?xml version='1.0' encoding='UTF-8'?>
<osm version='0.6' generator='JOSM'>
@ -862,7 +854,6 @@ sample_networks = {
"positions_on_rails": [[0], [1]],
},
},
"One rail connecting all stations and protruding at both ends": {
"xml": """<?xml version='1.0' encoding='UTF-8'?>
<osm version='0.6' generator='JOSM'>
@ -986,8 +977,10 @@ sample_networks = {
"positions_on_rails": [[1], [2], [3], [4], [5], [6]],
},
},
"Several rails with reversed order for backward route, connecting all stations and protruding at both ends": {
(
"Several rails with reversed order for backward route, "
"connecting all stations and protruding at both ends"
): {
"xml": """<?xml version='1.0' encoding='UTF-8'?>
<osm version='0.6' generator='JOSM'>
<node id='1' version='1' lat='0.0' lon='0.0'>
@ -1116,8 +1109,10 @@ sample_networks = {
"positions_on_rails": [[1], [2], [3], [4], [5], [6]],
},
},
"One rail laying near all stations requiring station projecting, protruding at both ends": {
(
"One rail laying near all stations requiring station projecting, "
"protruding at both ends"
): {
"xml": """<?xml version='1.0' encoding='UTF-8'?>
<osm version='0.6' generator='JOSM'>
<node id='1' version='1' lat='0.0001' lon='0.0'>
@ -1210,15 +1205,28 @@ sample_networks = {
"forward": {
"first_stop_on_rails_index": 0,
"last_stop_on_rails_index": 5,
"positions_on_rails": [[1/7], [2/7], [3/7], [4/7], [5/7], [6/7]],
"positions_on_rails": [
[1 / 7],
[2 / 7],
[3 / 7],
[4 / 7],
[5 / 7],
[6 / 7],
],
},
"backward": {
"first_stop_on_rails_index": 0,
"last_stop_on_rails_index": 5,
"positions_on_rails": [[1/7], [2/7], [3/7], [4/7], [5/7], [6/7]],
"positions_on_rails": [
[1 / 7],
[2 / 7],
[3 / 7],
[4 / 7],
[5 / 7],
[6 / 7],
],
},
},
"One rail laying near all stations except the first and last": {
"xml": """<?xml version='1.0' encoding='UTF-8'?>
<osm version='0.6' generator='JOSM'>
@ -1314,15 +1322,14 @@ sample_networks = {
"forward": {
"first_stop_on_rails_index": 1,
"last_stop_on_rails_index": 4,
"positions_on_rails": [[0], [1/3], [2/3], [1]],
"positions_on_rails": [[0], [1 / 3], [2 / 3], [1]],
},
"backward": {
"first_stop_on_rails_index": 1,
"last_stop_on_rails_index": 4,
"positions_on_rails": [[0], [1/3], [2/3], [1]],
"positions_on_rails": [[0], [1 / 3], [2 / 3], [1]],
},
},
"Circle route without rails": {
"xml": """<?xml version='1.0' encoding='UTF-8'?>
<osm version='0.6' generator='JOSM'>
@ -1391,7 +1398,6 @@ sample_networks = {
"positions_on_rails": [],
},
},
"Circle route with closed rail line connecting all stations": {
"xml": """<?xml version='1.0' encoding='UTF-8'?>
<osm version='0.6' generator='JOSM'>

View file

@ -25,7 +25,7 @@ class TestOneRouteTracks(unittest.TestCase):
"name": "Null Island",
"country": "World",
"continent": "Africa",
"num_stations": None, # Would be taken from the sample network data under testing
"num_stations": None, # Would be taken from the sample network data
"num_lines": 1,
"num_light_lines": 0,
"num_interchanges": 0,
@ -127,11 +127,11 @@ class TestOneRouteTracks(unittest.TestCase):
f"Wrong {attr} for {route_label} route",
)
first_index = route_data["first_stop_on_rails_index"]
last_index = route_data["last_stop_on_rails_index"]
first_ind = route_data["first_stop_on_rails_index"]
last_ind = route_data["last_stop_on_rails_index"]
positions_on_rails = [
rs.positions_on_rails
for rs in route.stops[first_index : last_index + 1]
for rs in route.stops[first_ind : last_ind + 1] # noqa E203
]
self.assertListAlmostEqual(
positions_on_rails, route_data["positions_on_rails"]

View file

@ -60,8 +60,8 @@ class TestGTFS(TestCase):
)
def test__dict_to_row__numeric_values(self) -> None:
"""Test that zero numeric values remain zeros in dict_to_row() function,
and not empty strings or None.
"""Test that zero numeric values remain zeros in dict_to_row()
function, and not empty strings or None.
"""
shapes = [

View file

@ -133,7 +133,11 @@ class TestProjection(unittest.TestCase):
"""The tested function should accept points as any consecutive
container with index operator.
"""
types = (tuple, list, collections.deque,)
types = (
tuple,
list,
collections.deque,
)
point = (0, 0.5)
segment_end1 = (0, 0)

View file

@ -1,7 +1,19 @@
validator_osm_wiki_url = (
"https://wiki.openstreetmap.org/wiki/Quality_assurance#subway-preprocessor"
)
github_url = "https://github.com/alexey-zakharenkov/subways"
produced_by = f"""Produced by
<a href="{github_url}">Subway Preprocessor</a> on {{date}}."""
metro_mapping_osm_article = "https://wiki.openstreetmap.org/wiki/Metro_Mapping"
list_of_metro_systems_url = (
"https://en.wikipedia.org/wiki/List_of_metro_systems#List"
)
# These are templates for validation_to_html.py
# Variables should be in curly braces
STYLE = '''
STYLE = """
<style>
body {
font-family: sans-serif;
@ -98,7 +110,7 @@ td > div {
}
.tooltip:hover:before,.tooltip:hover:after {
opacity: 1;
visibility: visible
visibility: visible
}
footer {
background: white;
@ -108,30 +120,31 @@ footer {
position: sticky;
}
</style>
'''
"""
INDEX_HEADER = '''
INDEX_HEADER = f"""
<!doctype html>
<html>
<head>
<title>Subway Validator</title>
<meta charset="utf-8">
(s)
{STYLE}
</head>
<body>
<main>
<h1>Subway Validation Results</h1>
<p><b>{good_cities}</b> of <b>{total_cities}</b> networks validated without errors.
To make a network validate successfully please follow the
<a href="https://wiki.openstreetmap.org/wiki/Metro_Mapping">metro mapping instructions</a>.
Commit your changes to the OSM and then check back to the updated validation results after the next validation cycle, please.
See <a href="https://wiki.openstreetmap.org/wiki/Quality_assurance#subway-preprocessor">the validator instance&#0040;s&#0041; description</a>
for the schedule and capabilities.</p>
<p><b>{{good_cities}}</b> of <b>{{total_cities}}</b> networks validated without
errors. To make a network validate successfully please follow the
<a href="{metro_mapping_osm_article}">metro mapping
instructions</a>. Commit your changes to the OSM and then check back to the
updated validation results after the next validation cycle, please.
See <a href="{validator_osm_wiki_url}">the validator instance&#0040;s&#0041;
description</a> for the schedule and capabilities.</p>
<p><a href="render.html">View networks on a map</a></p>
<table cellspacing="3" cellpadding="2" style="margin-bottom: 1em;">
'''.replace('(s)', STYLE)
"""
INDEX_CONTINENT = '''
INDEX_CONTINENT = """
<tr><td colspan="9">&nbsp;</td></tr>
<tr>
<th>Continent</th>
@ -157,9 +170,9 @@ INDEX_CONTINENT = '''
<td class="color{=notices}">{num_notices}</td>
</tr>
{content}
'''
"""
INDEX_COUNTRY = '''
INDEX_COUNTRY = """
<tr>
<td>&nbsp;</td>
<td class="bold color{=cities}"><a href="{file}">{country}</a></td>
@ -172,56 +185,58 @@ INDEX_COUNTRY = '''
<td class="color{=warnings}">{num_warnings}</td>
<td class="color{=notices}">{num_notices}</td>
</tr>
'''
"""
INDEX_FOOTER = '''
INDEX_FOOTER = f"""
</table>
</main>
<footer>Produced by <a href="https://github.com/alexey-zakharenkov/subways">Subway Preprocessor</a> on {date}.
See <a href="{google}">this spreadsheet</a> for the reference metro statistics and
<a href="https://en.wikipedia.org/wiki/List_of_metro_systems#List">this wiki page</a> for a list
of all metro systems.</footer>
<footer>{produced_by}
See <a href="{{google}}">this spreadsheet</a> for the reference
metro statistics and
<a href="{list_of_metro_systems_url}">
this wiki page</a> for a list of all metro systems.</footer>
</body>
</html>
'''
"""
COUNTRY_HEADER = '''
COUNTRY_HEADER = f"""
<!doctype html>
<html>
<head>
<title>Subway Validator: {country}</title>
<title>Subway Validator: {{country}}</title>
<meta charset="utf-8">
(s)
{STYLE}
</head>
<body>
<main>
<h1>Subway Validation Results for {country}</h1>
<h1>Subway Validation Results for {{country}}</h1>
<p><a href="index.html">Return to the countries list</a>.</p>
<table cellspacing="3" cellpadding="2">
<tr>
<th>City</th>
{?subways}
{{?subways}}
<th>Subway Lines</th>
<th>Light Rail Lines</th>
{end}{?overground}
{{end}}{{?overground}}
<th>Tram Lines</th>
<th>Bus Lines</th>
<th>T-Bus Lines</th>
<th>Other Lines</th>
{end}
{{end}}
<th>Stations</th>
<th>Interchanges</th>
<th>Unused Entrances</th>
</tr>
'''.replace('(s)', STYLE)
"""
COUNTRY_CITY = '''
COUNTRY_CITY = """
<tr id="{slug}">
<td class="bold color{good_cities}">
{city}
{?yaml}<a href="{yaml}" class="hlink" title="Download YAML">Y</a>{end}
{?json}<a href="{json}" class="hlink" title="Download GeoJSON">J</a>{end}
{?json}<a href="render.html#{slug}" class="hlink" title="View map" target="_blank">M</a>{end}
{?json}<a href="render.html#{slug}" class="hlink" title="View map"
target="_blank">M</a>{end}
</td>
{?subways}
<td class="color{=subwayl}">sub: {subwayl_found} / {subwayl_expected}</td>
@ -229,36 +244,55 @@ COUNTRY_CITY = '''
{end}{?overground}
<td class="color{=traml}">t: {traml_found} / {traml_expected}</td>
<td class="color{=busl}">b: {busl_found} / {busl_expected}</td>
<td class="color{=trolleybusl}">tb: {trolleybusl_found} / {trolleybusl_expected}</td>
<td class="color{=trolleybusl}">
tb: {trolleybusl_found} / {trolleybusl_expected}
</td>
<td class="color{=otherl}">o: {otherl_found} / {otherl_expected}</td>
{end}
<td class="color{=stations}">st: {stations_found} / {stations_expected}</td>
<td class="color{=transfers}">int: {transfers_found} / {transfers_expected}</td>
<td class="color{=transfers}">
int: {transfers_found} / {transfers_expected}
</td>
<td class="color{=entrances}">ent: {unused_entrances}</td>
</tr>
<tr><td colspan="{?subways}6{end}{?overground}8{end}">
{?errors}
<div class="errors"><div data-text="Network is invalid and not suitable for routing." class="tooltip">🛑 Errors</div>
<div class="errors">
<div
data-text="Network is invalid and not suitable for routing."
class="tooltip">
🛑 Errors
</div>
{errors}
</div>
{end}
{?warnings}
<div class="warnings"><div data-text="Problematic data but it's still possible to build routes." class="tooltip"> Warnings</div>
<div class="warnings">
<div
data-text="Problematic data but it's still possible to build routes."
class="tooltip">
Warnings
</div>
{warnings}
</div>
{end}
{?notices}
<div class="notices"><div data-text="Suspicious condition but not necessarily an error." class="tooltip"> Notices</div>
<div class="notices">
<div
data-text="Suspicious condition but not necessarily an error."
class="tooltip">
Notices
</div>
{notices}
{end}
</div>
</td></tr>
'''
"""
COUNTRY_FOOTER = '''
COUNTRY_FOOTER = f"""
</table>
</main>
<footer>Produced by <a href="https://github.com/alexey-zakharenkov/subways">Subway Preprocessor</a> on {date}.</footer>
<footer>{produced_by}</footer>
</body>
</html>
'''
"""

View file

@ -1,38 +1,47 @@
#!/usr/bin/env python3
import datetime
import re
import os
import sys
import json
import os
import re
import sys
from subway_structure import SPREADSHEET_ID
from v2h_templates import *
from v2h_templates import (
COUNTRY_CITY,
COUNTRY_FOOTER,
COUNTRY_HEADER,
INDEX_CONTINENT,
INDEX_COUNTRY,
INDEX_FOOTER,
INDEX_HEADER,
)
class CityData:
def __init__(self, city=None):
self.city = city is not None
self.data = {
'good_cities': 0,
'total_cities': 1 if city else 0,
'num_errors': 0,
'num_warnings': 0,
'num_notices': 0
"good_cities": 0,
"total_cities": 1 if city else 0,
"num_errors": 0,
"num_warnings": 0,
"num_notices": 0,
}
self.slug = None
if city:
self.slug = city['slug']
self.country = city['country']
self.continent = city['continent']
self.errors = city['errors']
self.warnings = city['warnings']
self.notices = city['notices']
self.slug = city["slug"]
self.country = city["country"]
self.continent = city["continent"]
self.errors = city["errors"]
self.warnings = city["warnings"]
self.notices = city["notices"]
if not self.errors:
self.data['good_cities'] = 1
self.data['num_errors'] = len(self.errors)
self.data['num_warnings'] = len(self.warnings)
self.data['num_notices'] = len(self.notices)
self.data["good_cities"] = 1
self.data["num_errors"] = len(self.errors)
self.data["num_warnings"] = len(self.warnings)
self.data["num_notices"] = len(self.notices)
for k, v in city.items():
if 'found' in k or 'expected' in k or 'unused' in k:
if "found" in k or "expected" in k or "unused" in k:
self.data[k] = v
def not__get__(self, i):
@ -49,37 +58,37 @@ class CityData:
def format(self, s):
def test_eq(v1, v2):
return '1' if v1 == v2 else '0'
return "1" if v1 == v2 else "0"
for k in self.data:
s = s.replace('{' + k + '}', str(self.data[k]))
s = s.replace('{slug}', self.slug or '')
s = s.replace("{" + k + "}", str(self.data[k]))
s = s.replace("{slug}", self.slug or "")
for k in (
'subwayl',
'lightrl',
'stations',
'transfers',
'busl',
'trolleybusl',
'traml',
'otherl',
"subwayl",
"lightrl",
"stations",
"transfers",
"busl",
"trolleybusl",
"traml",
"otherl",
):
if k + '_expected' in self.data:
if k + "_expected" in self.data:
s = s.replace(
'{=' + k + '}',
"{=" + k + "}",
test_eq(
self.data[k + '_found'], self.data[k + '_expected']
self.data[k + "_found"], self.data[k + "_expected"]
),
)
s = s.replace(
'{=cities}',
test_eq(self.data['good_cities'], self.data['total_cities']),
"{=cities}",
test_eq(self.data["good_cities"], self.data["total_cities"]),
)
s = s.replace(
'{=entrances}', test_eq(self.data['unused_entrances'], 0)
"{=entrances}", test_eq(self.data["unused_entrances"], 0)
)
for k in ('errors', 'warnings', 'notices'):
s = s.replace('{=' + k + '}', test_eq(self.data['num_' + k], 0))
for k in ("errors", "warnings", "notices"):
s = s.replace("{=" + k + "}", test_eq(self.data["num_" + k], 0))
return s
@ -89,27 +98,27 @@ def tmpl(s, data=None, **kwargs):
if kwargs:
for k, v in kwargs.items():
if v is not None:
s = s.replace('{' + k + '}', str(v))
s = s.replace("{" + k + "}", str(v))
s = re.sub(
r'\{\?' + k + r'\}(.+?)\{end\}',
r'\1' if v else '',
r"\{\?" + k + r"\}(.+?)\{end\}",
r"\1" if v else "",
s,
flags=re.DOTALL,
)
s = s.replace('{date}', date)
s = s.replace("{date}", date)
google_url = (
'https://docs.google.com/spreadsheets/d/{}/edit?usp=sharing'.format(
"https://docs.google.com/spreadsheets/d/{}/edit?usp=sharing".format(
SPREADSHEET_ID
)
)
s = s.replace('{google}', google_url)
s = s.replace("{google}", google_url)
return s
EXPAND_OSM_TYPE = {'n': 'node', 'w': 'way', 'r': 'relation'}
RE_SHORT = re.compile(r'\b([nwr])(\d+)\b')
RE_FULL = re.compile(r'\b(node|way|relation) (\d+)\b')
RE_COORDS = re.compile(r'\((-?\d+\.\d+), (-?\d+\.\d+)\)')
EXPAND_OSM_TYPE = {"n": "node", "w": "way", "r": "relation"}
RE_SHORT = re.compile(r"\b([nwr])(\d+)\b")
RE_FULL = re.compile(r"\b(node|way|relation) (\d+)\b")
RE_COORDS = re.compile(r"\((-?\d+\.\d+), (-?\d+\.\d+)\)")
def osm_links(s):
@ -123,25 +132,26 @@ def osm_links(s):
s = RE_SHORT.sub(link, s)
s = RE_FULL.sub(link, s)
s = RE_COORDS.sub(
r'(<a href="https://www.openstreetmap.org/search?query=\2%2C\1#map=18/\2/\1">pos</a>)',
r'(<a href="https://www.openstreetmap.org/search?'
r'query=\2%2C\1#map=18/\2/\1">pos</a>)',
s,
)
return s
def esc(s):
return s.replace('&', '&amp;').replace('<', '&lt;').replace('>', '&gt;')
return s.replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;")
if len(sys.argv) < 2:
print('Reads a log from subway validator and prepares HTML files.')
print("Reads a log from subway validator and prepares HTML files.")
print(
'Usage: {} <validation.log> [<target_directory>]'.format(sys.argv[0])
"Usage: {} <validation.log> [<target_directory>]".format(sys.argv[0])
)
sys.exit(1)
with open(sys.argv[1], 'r', encoding='utf-8') as f:
data = {c['name']: CityData(c) for c in json.load(f)}
with open(sys.argv[1], "r", encoding="utf-8") as f:
data = {c["name"]: CityData(c) for c in json.load(f)}
countries = {}
continents = {}
@ -154,16 +164,16 @@ for c in data.values():
c_by_c[c.continent].add(c.country)
world = sum(continents.values(), CityData())
overground = 'traml_expected' in next(iter(data.values())).data
date = datetime.datetime.utcnow().strftime('%d.%m.%Y %H:%M UTC')
path = '.' if len(sys.argv) < 3 else sys.argv[2]
index = open(os.path.join(path, 'index.html'), 'w', encoding='utf-8')
overground = "traml_expected" in next(iter(data.values())).data
date = datetime.datetime.utcnow().strftime("%d.%m.%Y %H:%M UTC")
path = "." if len(sys.argv) < 3 else sys.argv[2]
index = open(os.path.join(path, "index.html"), "w", encoding="utf-8")
index.write(tmpl(INDEX_HEADER, world))
for continent in sorted(continents.keys()):
content = ''
content = ""
for country in sorted(c_by_c[continent]):
country_file_name = country.lower().replace(' ', '-') + '.html'
country_file_name = country.lower().replace(" ", "-") + ".html"
content += tmpl(
INDEX_COUNTRY,
countries[country],
@ -172,7 +182,7 @@ for continent in sorted(continents.keys()):
continent=continent,
)
country_file = open(
os.path.join(path, country_file_name), 'w', encoding='utf-8'
os.path.join(path, country_file_name), "w", encoding="utf-8"
)
country_file.write(
tmpl(
@ -187,18 +197,22 @@ for continent in sorted(continents.keys()):
if city.country == country:
file_base = os.path.join(path, city.slug)
yaml_file = (
city.slug + '.yaml'
if os.path.exists(file_base + '.yaml')
city.slug + ".yaml"
if os.path.exists(file_base + ".yaml")
else None
)
json_file = (
city.slug + '.geojson'
if os.path.exists(file_base + '.geojson')
city.slug + ".geojson"
if os.path.exists(file_base + ".geojson")
else None
)
errors = '<br>'.join([osm_links(esc(e)) for e in city.errors])
warnings = '<br>'.join([osm_links(esc(w)) for w in city.warnings])
notices = '<br>'.join([osm_links(esc(n)) for n in city.notices])
errors = "<br>".join([osm_links(esc(e)) for e in city.errors])
warnings = "<br>".join(
[osm_links(esc(w)) for w in city.warnings]
)
notices = "<br>".join(
[osm_links(esc(n)) for n in city.notices]
)
country_file.write(
tmpl(
COUNTRY_CITY,