#!/usr/bin/env python3 import codecs import json import math import re import sys import urllib.parse import urllib.request import kdtree from lxml import etree QUERY = """ [out:json][timeout:250][bbox:{{bbox}}]; ( relation["route"="tram"]; relation["public_transport"="stop_area"]; nwr["railway"="tram_stop"]; ); (._;>>;); out meta center qt; """ def el_id(el): return el["type"][0] + str(el.get("id", el.get("ref", ""))) class StationWrapper: def __init__(self, st): if "center" in st: self.coords = (st["center"]["lon"], st["center"]["lat"]) elif "lon" in st: self.coords = (st["lon"], st["lat"]) else: raise Exception("Coordinates not found for station {}".format(st)) self.station = st def __len__(self): return 2 def __getitem__(self, i): return self.coords[i] def distance(self, other): """Calculate distance in meters.""" dx = math.radians(self[0] - other["lon"]) * math.cos( 0.5 * math.radians(self[1] + other["lat"]) ) dy = math.radians(self[1] - other["lat"]) return 6378137 * math.sqrt(dx * dx + dy * dy) def overpass_request(bbox): url = "http://overpass-api.de/api/interpreter?data={}".format( urllib.parse.quote(QUERY.replace("{{bbox}}", bbox)) ) response = urllib.request.urlopen(url, timeout=1000) if response.getcode() != 200: raise Exception( "Failed to query Overpass API: HTTP {}".format(response.getcode()) ) reader = codecs.getreader("utf-8") return json.load(reader(response))["elements"] def is_part_of_stop(tags): if tags.get("public_transport") in ("platform", "stop_position"): return True if tags.get("railway") == "platform": return True return False def add_stop_areas(src): if not src: raise Exception("Empty dataset provided to add_stop_areas") # Create a kd-tree out of tram stations stations = kdtree.create(dimensions=2) for el in src: if "tags" in el and el["tags"].get("railway") == "tram_stop": stations.add(StationWrapper(el)) if stations.is_leaf: raise Exception("No stations found") elements = {} for el in src: if el.get("tags"): elements[el_id(el)] = el # Populate a list of nearby subway exits and platforms for each station MAX_DISTANCE = 50 # meters stop_areas = {} for el in src: # Only tram routes if ( "tags" not in el or el["type"] != "relation" or el["tags"].get("route") != "tram" ): continue for m in el["members"]: if el_id(m) not in elements: continue pel = elements[el_id(m)] if not is_part_of_stop(pel["tags"]): continue if pel["tags"].get("railway") == "tram_stop": continue coords = pel.get("center", pel) station = stations.search_nn((coords["lon"], coords["lat"]))[ 0 ].data if station.distance(coords) < MAX_DISTANCE: k = ( station.station["id"], station.station["tags"].get("name", None), ) if k not in stop_areas: stop_areas[k] = {el_id(station.station): station.station} stop_areas[k][el_id(m)] = pel # Find existing stop_area relations for stations and remove these stations for el in src: if ( el["type"] == "relation" and el["tags"].get("public_transport", None) == "stop_area" ): found = False for m in el["members"]: if found: break for st in stop_areas: if el_id(m) in stop_areas[st]: del stop_areas[st] found = True break # Create OSM XML for new stop_area relations root = etree.Element("osm", version="0.6") rid = -1 for st, members in stop_areas.items(): rel = etree.SubElement(root, "relation", id=str(rid)) rid -= 1 etree.SubElement(rel, "tag", k="type", v="public_transport") etree.SubElement(rel, "tag", k="public_transport", v="stop_area") if st[1]: etree.SubElement(rel, "tag", k="name", v=st[1]) for m in members.values(): etree.SubElement( rel, "member", ref=str(m["id"]), type=m["type"], role="" ) # Add all downloaded elements for el in src: obj = etree.SubElement(root, el["type"]) for a in ( "id", "type", "user", "uid", "version", "changeset", "timestamp", "lat", "lon", ): if a in el: obj.set(a, str(el[a])) if "modified" in el: obj.set("action", "modify") if "tags" in el: for k, v in el["tags"].items(): etree.SubElement(obj, "tag", k=k, v=v) if "members" in el: for m in el["members"]: etree.SubElement( obj, "member", ref=str(m["ref"]), type=m["type"], role=m.get("role", ""), ) if "nodes" in el: for n in el["nodes"]: etree.SubElement(obj, "nd", ref=str(n)) return etree.tostring(root, pretty_print=True, encoding="utf-8") if __name__ == "__main__": if len(sys.argv) < 2: print( "Read a JSON from Overpass and output JOSM OSM XML " "with added stop_area relations" ) print( "Usage: {} {{|}} [output.osm]".format( sys.argv[0] ) ) sys.exit(1) if re.match(r"^[-0-9.,]+$", sys.argv[1]): bbox = sys.argv[1].split(",") src = overpass_request(",".join([bbox[i] for i in (1, 0, 3, 2)])) else: with open(sys.argv[1], "r") as f: src = json.load(f)["elements"] result = add_stop_areas(src) if len(sys.argv) < 3: print(result.decode("utf-8")) else: with open(sys.argv[2], "wb") as f: f.write(result)